vortex_serde/chunked_reader/
take_rows.rs

1use std::ops::Range;
2
3use bytes::BytesMut;
4use futures_util::{stream, StreamExt, TryStreamExt};
5use itertools::Itertools;
6use vortex_array::aliases::hash_map::HashMap;
7use vortex_array::array::{ChunkedArray, PrimitiveArray};
8use vortex_array::compute::unary::{subtract_scalar, try_cast};
9use vortex_array::compute::{search_sorted, slice, take, SearchSortedSide};
10use vortex_array::stats::ArrayStatistics;
11use vortex_array::stream::{ArrayStream, ArrayStreamExt};
12use vortex_array::{Array, ArrayDType, IntoArray, IntoArrayVariant};
13use vortex_dtype::PType;
14use vortex_error::{vortex_bail, vortex_err, VortexResult};
15use vortex_scalar::Scalar;
16
17use crate::chunked_reader::ChunkedArrayReader;
18use crate::io::VortexReadAt;
19use crate::stream_reader::StreamArrayReader;
20
21impl<R: VortexReadAt> ChunkedArrayReader<R> {
22    pub async fn take_rows(&mut self, indices: &Array) -> VortexResult<Array> {
23        // Figure out if the row indices are sorted / unique. If not, we need to sort them.
24        if indices
25            .statistics()
26            .compute_is_strict_sorted()
27            .unwrap_or(false)
28        {
29            // With strict-sorted indices, we can take the rows directly.
30            return self.take_rows_strict_sorted(indices).await;
31        }
32
33        //         // Figure out which chunks are relevant to the read operation using the row_offsets array.
34        //         // Depending on whether there are more indices than chunks, we may wish to perform this
35        //         // join differently.
36        //
37        //         // Coalesce the chunks we care about by some metric.
38        //
39        //         // TODO(ngates): we could support read_into for array builders since we know the size
40        //         //  of the result.
41        //         // Read the relevant chunks.
42        // Reshuffle the result as per the original sort order.
43        unimplemented!("Unsorted 'take' operation is not supported yet")
44    }
45
46    /// Take rows from a chunked array given strict sorted indices.
47    ///
48    /// The strategy for doing this depends on the quantity and distribution of the indices...
49    ///
50    /// For now, we will find the relevant chunks, coalesce them, and read.
51    async fn take_rows_strict_sorted(&mut self, indices: &Array) -> VortexResult<Array> {
52        // Figure out which chunks are relevant.
53        let chunk_idxs = find_chunks(&self.row_offsets, indices)?;
54        // Coalesce the chunks that we're going to read from.
55        let coalesced_chunks = self.coalesce_chunks(chunk_idxs.as_ref());
56
57        let mut start_chunks: Vec<u32> = Vec::with_capacity(coalesced_chunks.len());
58        let mut stop_chunks: Vec<u32> = Vec::with_capacity(coalesced_chunks.len());
59        for (i, chunks) in coalesced_chunks.iter().enumerate() {
60            start_chunks.push(
61                chunks
62                    .first()
63                    .ok_or_else(|| vortex_err!("Coalesced chunk {i} cannot be empty"))?
64                    .chunk_idx,
65            );
66            stop_chunks.push(
67                chunks
68                    .last()
69                    .ok_or_else(|| vortex_err!("Coalesced chunk {i} cannot be empty"))?
70                    .chunk_idx
71                    + 1,
72            );
73        }
74
75        // Grab the row and byte offsets for each chunk range.
76        let start_chunks = PrimitiveArray::from(start_chunks).into_array();
77        let start_rows = take(&self.row_offsets, &start_chunks)?.into_primitive()?;
78        let start_bytes = take(&self.byte_offsets, &start_chunks)?.into_primitive()?;
79
80        let stop_chunks = PrimitiveArray::from(stop_chunks).into_array();
81        let stop_rows = take(&self.row_offsets, &stop_chunks)?.into_primitive()?;
82        let stop_bytes = take(&self.byte_offsets, &stop_chunks)?.into_primitive()?;
83
84        // For each chunk-range, read the data as an ArrayStream and call take on it.
85        let chunks = stream::iter(0..coalesced_chunks.len())
86            .map(|chunk_idx| {
87                let (start_byte, stop_byte) = (
88                    start_bytes.get_as_cast::<u64>(chunk_idx),
89                    stop_bytes.get_as_cast::<u64>(chunk_idx),
90                );
91                let (start_row, stop_row) = (
92                    start_rows.get_as_cast::<u64>(chunk_idx),
93                    stop_rows.get_as_cast::<u64>(chunk_idx),
94                );
95                self.take_from_chunk(indices, start_byte..stop_byte, start_row..stop_row)
96            })
97            .buffered(10)
98            .try_flatten()
99            .try_collect()
100            .await?;
101
102        Ok(ChunkedArray::try_new(chunks, (*self.dtype).clone())?.into_array())
103    }
104
105    /// Coalesce reads for the given chunks.
106    ///
107    /// This depends on a few factors:
108    /// * The number of bytes between adjacent selected chunks.
109    /// * The latency of the underlying storage.
110    /// * The throughput of the underlying storage.
111    fn coalesce_chunks(&self, chunk_idxs: &[ChunkIndices]) -> Vec<Vec<ChunkIndices>> {
112        let _hint = self.read.performance_hint();
113        chunk_idxs
114            .iter()
115            .map(|chunk_idx| vec![chunk_idx.clone()])
116            .collect_vec()
117    }
118
119    async fn take_from_chunk(
120        &self,
121        indices: &Array,
122        byte_range: Range<u64>,
123        row_range: Range<u64>,
124    ) -> VortexResult<impl ArrayStream> {
125        let range_byte_len = (byte_range.end - byte_range.start) as usize;
126
127        // Relativize the indices to these chunks
128        let indices_start =
129            search_sorted(indices, row_range.start, SearchSortedSide::Left)?.to_index();
130        let indices_stop =
131            search_sorted(indices, row_range.end, SearchSortedSide::Right)?.to_index();
132        let relative_indices = slice(indices, indices_start, indices_stop)?;
133        let row_start_scalar = Scalar::from(row_range.start).cast(relative_indices.dtype())?;
134        let relative_indices = subtract_scalar(&relative_indices, &row_start_scalar)?;
135
136        // Set up an array reader to read this range of chunks.
137        let mut buffer = BytesMut::with_capacity(range_byte_len);
138        unsafe { buffer.set_len(range_byte_len) }
139        // TODO(ngates): instead of reading the whole range into a buffer, we should stream
140        //  the byte range (e.g. if its coming from an HTTP endpoint) and wrap that with an
141        //  MesssageReader.
142        let buffer = self.read.read_at_into(byte_range.start, buffer).await?;
143
144        let reader = StreamArrayReader::try_new(buffer, self.context.clone())
145            .await?
146            .with_dtype(self.dtype.clone());
147
148        // Take the indices from the stream.
149        reader.into_array_stream().take_rows(relative_indices)
150    }
151}
152
153/// Find the chunks that are relevant to the read operation.
154/// Both the row_offsets and indices arrays must be strict-sorted.
155fn find_chunks(row_offsets: &Array, indices: &Array) -> VortexResult<Vec<ChunkIndices>> {
156    // TODO(ngates): lots of optimizations to be had here, potentially lots of push-down.
157    //  For now, we just flatten everything into primitive arrays and iterate.
158    let row_offsets = try_cast(row_offsets, PType::U64.into())?.into_primitive()?;
159    let _rows = format!("{:?}", row_offsets.maybe_null_slice::<u64>());
160    let indices = try_cast(indices, PType::U64.into())?.into_primitive()?;
161    let _indices = format!("{:?}", indices.maybe_null_slice::<u64>());
162
163    if let (Some(last_idx), Some(num_rows)) = (
164        indices.maybe_null_slice::<u64>().last(),
165        row_offsets.maybe_null_slice::<u64>().last(),
166    ) {
167        if last_idx >= num_rows {
168            vortex_bail!("Index {} out of bounds {}", last_idx, num_rows);
169        }
170    }
171
172    let mut chunks = HashMap::new();
173
174    for (pos, idx) in indices.maybe_null_slice::<u64>().iter().enumerate() {
175        let chunk_idx = search_sorted(row_offsets.as_ref(), *idx, SearchSortedSide::Right)?
176            .to_ends_index(row_offsets.len())
177            .saturating_sub(1);
178        chunks
179            .entry(chunk_idx as u32)
180            .and_modify(|chunk_indices: &mut ChunkIndices| {
181                chunk_indices.indices_stop = (pos + 1) as u64;
182            })
183            .or_insert(ChunkIndices {
184                chunk_idx: chunk_idx as u32,
185                indices_start: pos as u64,
186                indices_stop: (pos + 1) as u64,
187            });
188    }
189
190    Ok(chunks
191        .keys()
192        .sorted()
193        .map(|k| &chunks[k])
194        .cloned()
195        .collect_vec())
196}
197
198#[derive(Debug, Clone)]
199#[allow(dead_code)]
200struct ChunkIndices {
201    chunk_idx: u32,
202    // The position into the indices array that is covered by this chunk.
203    indices_start: u64,
204    indices_stop: u64,
205}
206
207#[cfg(test)]
208#[allow(clippy::panic_in_result_fn)]
209mod test {
210    use std::io::Cursor;
211    use std::sync::Arc;
212
213    use futures_executor::block_on;
214    use itertools::Itertools;
215    use vortex_array::array::{ChunkedArray, PrimitiveArray};
216    use vortex_array::{Context, IntoArray, IntoArrayVariant};
217    use vortex_buffer::Buffer;
218    use vortex_dtype::PType;
219    use vortex_error::VortexResult;
220
221    use crate::chunked_reader::ChunkedArrayReader;
222    use crate::stream_writer::StreamArrayWriter;
223    use crate::MessageReader;
224
225    fn chunked_array() -> VortexResult<StreamArrayWriter<Vec<u8>>> {
226        let c = ChunkedArray::try_new(
227            vec![PrimitiveArray::from((0i32..1000).collect_vec()).into_array(); 10],
228            PType::I32.into(),
229        )?
230        .into_array();
231
232        block_on(async { StreamArrayWriter::new(vec![]).write_array(c).await })
233    }
234
235    #[test]
236    #[cfg_attr(miri, ignore)]
237    fn test_take_rows() -> VortexResult<()> {
238        let writer = chunked_array()?;
239
240        let array_layout = writer.array_layouts()[0].clone();
241        let byte_offsets = PrimitiveArray::from(array_layout.chunks.byte_offsets.clone());
242        let row_offsets = PrimitiveArray::from(array_layout.chunks.row_offsets.clone());
243
244        let buffer = Buffer::from(writer.into_inner());
245
246        let mut msgs =
247            block_on(async { MessageReader::try_new(Cursor::new(buffer.clone())).await })?;
248        let dtype = Arc::new(block_on(async { msgs.read_dtype().await })?);
249
250        let mut reader = ChunkedArrayReader::try_new(
251            buffer,
252            Arc::new(Context::default()),
253            dtype,
254            byte_offsets.into_array(),
255            row_offsets.into_array(),
256        )
257        .unwrap();
258
259        let result = block_on(async {
260            reader
261                .take_rows(&PrimitiveArray::from(vec![0u64, 10, 10_000 - 1]).into_array())
262                .await
263        })?
264        .into_primitive()?;
265
266        assert_eq!(result.len(), 3);
267        assert_eq!(result.maybe_null_slice::<i32>(), &[0, 10, 999]);
268        Ok(())
269    }
270}