vortex_serde/chunked_reader/
take_rows.rs1use std::ops::Range;
2
3use bytes::BytesMut;
4use futures_util::{stream, StreamExt, TryStreamExt};
5use itertools::Itertools;
6use vortex_array::aliases::hash_map::HashMap;
7use vortex_array::array::{ChunkedArray, PrimitiveArray};
8use vortex_array::compute::unary::{subtract_scalar, try_cast};
9use vortex_array::compute::{search_sorted, slice, take, SearchSortedSide};
10use vortex_array::stats::ArrayStatistics;
11use vortex_array::stream::{ArrayStream, ArrayStreamExt};
12use vortex_array::{Array, ArrayDType, IntoArray, IntoArrayVariant};
13use vortex_dtype::PType;
14use vortex_error::{vortex_bail, vortex_err, VortexResult};
15use vortex_scalar::Scalar;
16
17use crate::chunked_reader::ChunkedArrayReader;
18use crate::io::VortexReadAt;
19use crate::stream_reader::StreamArrayReader;
20
21impl<R: VortexReadAt> ChunkedArrayReader<R> {
22 pub async fn take_rows(&mut self, indices: &Array) -> VortexResult<Array> {
23 if indices
25 .statistics()
26 .compute_is_strict_sorted()
27 .unwrap_or(false)
28 {
29 return self.take_rows_strict_sorted(indices).await;
31 }
32
33 unimplemented!("Unsorted 'take' operation is not supported yet")
44 }
45
46 async fn take_rows_strict_sorted(&mut self, indices: &Array) -> VortexResult<Array> {
52 let chunk_idxs = find_chunks(&self.row_offsets, indices)?;
54 let coalesced_chunks = self.coalesce_chunks(chunk_idxs.as_ref());
56
57 let mut start_chunks: Vec<u32> = Vec::with_capacity(coalesced_chunks.len());
58 let mut stop_chunks: Vec<u32> = Vec::with_capacity(coalesced_chunks.len());
59 for (i, chunks) in coalesced_chunks.iter().enumerate() {
60 start_chunks.push(
61 chunks
62 .first()
63 .ok_or_else(|| vortex_err!("Coalesced chunk {i} cannot be empty"))?
64 .chunk_idx,
65 );
66 stop_chunks.push(
67 chunks
68 .last()
69 .ok_or_else(|| vortex_err!("Coalesced chunk {i} cannot be empty"))?
70 .chunk_idx
71 + 1,
72 );
73 }
74
75 let start_chunks = PrimitiveArray::from(start_chunks).into_array();
77 let start_rows = take(&self.row_offsets, &start_chunks)?.into_primitive()?;
78 let start_bytes = take(&self.byte_offsets, &start_chunks)?.into_primitive()?;
79
80 let stop_chunks = PrimitiveArray::from(stop_chunks).into_array();
81 let stop_rows = take(&self.row_offsets, &stop_chunks)?.into_primitive()?;
82 let stop_bytes = take(&self.byte_offsets, &stop_chunks)?.into_primitive()?;
83
84 let chunks = stream::iter(0..coalesced_chunks.len())
86 .map(|chunk_idx| {
87 let (start_byte, stop_byte) = (
88 start_bytes.get_as_cast::<u64>(chunk_idx),
89 stop_bytes.get_as_cast::<u64>(chunk_idx),
90 );
91 let (start_row, stop_row) = (
92 start_rows.get_as_cast::<u64>(chunk_idx),
93 stop_rows.get_as_cast::<u64>(chunk_idx),
94 );
95 self.take_from_chunk(indices, start_byte..stop_byte, start_row..stop_row)
96 })
97 .buffered(10)
98 .try_flatten()
99 .try_collect()
100 .await?;
101
102 Ok(ChunkedArray::try_new(chunks, (*self.dtype).clone())?.into_array())
103 }
104
105 fn coalesce_chunks(&self, chunk_idxs: &[ChunkIndices]) -> Vec<Vec<ChunkIndices>> {
112 let _hint = self.read.performance_hint();
113 chunk_idxs
114 .iter()
115 .map(|chunk_idx| vec![chunk_idx.clone()])
116 .collect_vec()
117 }
118
119 async fn take_from_chunk(
120 &self,
121 indices: &Array,
122 byte_range: Range<u64>,
123 row_range: Range<u64>,
124 ) -> VortexResult<impl ArrayStream> {
125 let range_byte_len = (byte_range.end - byte_range.start) as usize;
126
127 let indices_start =
129 search_sorted(indices, row_range.start, SearchSortedSide::Left)?.to_index();
130 let indices_stop =
131 search_sorted(indices, row_range.end, SearchSortedSide::Right)?.to_index();
132 let relative_indices = slice(indices, indices_start, indices_stop)?;
133 let row_start_scalar = Scalar::from(row_range.start).cast(relative_indices.dtype())?;
134 let relative_indices = subtract_scalar(&relative_indices, &row_start_scalar)?;
135
136 let mut buffer = BytesMut::with_capacity(range_byte_len);
138 unsafe { buffer.set_len(range_byte_len) }
139 let buffer = self.read.read_at_into(byte_range.start, buffer).await?;
143
144 let reader = StreamArrayReader::try_new(buffer, self.context.clone())
145 .await?
146 .with_dtype(self.dtype.clone());
147
148 reader.into_array_stream().take_rows(relative_indices)
150 }
151}
152
153fn find_chunks(row_offsets: &Array, indices: &Array) -> VortexResult<Vec<ChunkIndices>> {
156 let row_offsets = try_cast(row_offsets, PType::U64.into())?.into_primitive()?;
159 let _rows = format!("{:?}", row_offsets.maybe_null_slice::<u64>());
160 let indices = try_cast(indices, PType::U64.into())?.into_primitive()?;
161 let _indices = format!("{:?}", indices.maybe_null_slice::<u64>());
162
163 if let (Some(last_idx), Some(num_rows)) = (
164 indices.maybe_null_slice::<u64>().last(),
165 row_offsets.maybe_null_slice::<u64>().last(),
166 ) {
167 if last_idx >= num_rows {
168 vortex_bail!("Index {} out of bounds {}", last_idx, num_rows);
169 }
170 }
171
172 let mut chunks = HashMap::new();
173
174 for (pos, idx) in indices.maybe_null_slice::<u64>().iter().enumerate() {
175 let chunk_idx = search_sorted(row_offsets.as_ref(), *idx, SearchSortedSide::Right)?
176 .to_ends_index(row_offsets.len())
177 .saturating_sub(1);
178 chunks
179 .entry(chunk_idx as u32)
180 .and_modify(|chunk_indices: &mut ChunkIndices| {
181 chunk_indices.indices_stop = (pos + 1) as u64;
182 })
183 .or_insert(ChunkIndices {
184 chunk_idx: chunk_idx as u32,
185 indices_start: pos as u64,
186 indices_stop: (pos + 1) as u64,
187 });
188 }
189
190 Ok(chunks
191 .keys()
192 .sorted()
193 .map(|k| &chunks[k])
194 .cloned()
195 .collect_vec())
196}
197
198#[derive(Debug, Clone)]
199#[allow(dead_code)]
200struct ChunkIndices {
201 chunk_idx: u32,
202 indices_start: u64,
204 indices_stop: u64,
205}
206
207#[cfg(test)]
208#[allow(clippy::panic_in_result_fn)]
209mod test {
210 use std::io::Cursor;
211 use std::sync::Arc;
212
213 use futures_executor::block_on;
214 use itertools::Itertools;
215 use vortex_array::array::{ChunkedArray, PrimitiveArray};
216 use vortex_array::{Context, IntoArray, IntoArrayVariant};
217 use vortex_buffer::Buffer;
218 use vortex_dtype::PType;
219 use vortex_error::VortexResult;
220
221 use crate::chunked_reader::ChunkedArrayReader;
222 use crate::stream_writer::StreamArrayWriter;
223 use crate::MessageReader;
224
225 fn chunked_array() -> VortexResult<StreamArrayWriter<Vec<u8>>> {
226 let c = ChunkedArray::try_new(
227 vec![PrimitiveArray::from((0i32..1000).collect_vec()).into_array(); 10],
228 PType::I32.into(),
229 )?
230 .into_array();
231
232 block_on(async { StreamArrayWriter::new(vec![]).write_array(c).await })
233 }
234
235 #[test]
236 #[cfg_attr(miri, ignore)]
237 fn test_take_rows() -> VortexResult<()> {
238 let writer = chunked_array()?;
239
240 let array_layout = writer.array_layouts()[0].clone();
241 let byte_offsets = PrimitiveArray::from(array_layout.chunks.byte_offsets.clone());
242 let row_offsets = PrimitiveArray::from(array_layout.chunks.row_offsets.clone());
243
244 let buffer = Buffer::from(writer.into_inner());
245
246 let mut msgs =
247 block_on(async { MessageReader::try_new(Cursor::new(buffer.clone())).await })?;
248 let dtype = Arc::new(block_on(async { msgs.read_dtype().await })?);
249
250 let mut reader = ChunkedArrayReader::try_new(
251 buffer,
252 Arc::new(Context::default()),
253 dtype,
254 byte_offsets.into_array(),
255 row_offsets.into_array(),
256 )
257 .unwrap();
258
259 let result = block_on(async {
260 reader
261 .take_rows(&PrimitiveArray::from(vec![0u64, 10, 10_000 - 1]).into_array())
262 .await
263 })?
264 .into_primitive()?;
265
266 assert_eq!(result.len(), 3);
267 assert_eq!(result.maybe_null_slice::<i32>(), &[0, 10, 999]);
268 Ok(())
269 }
270}