1use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::session::ArraySessionExt;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_dtype::DType;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_io::VortexReadAt;
16use vortex_io::session::RuntimeSessionExt;
17use vortex_layout::segments::InstrumentedSegmentCache;
18use vortex_layout::segments::NoOpSegmentCache;
19use vortex_layout::segments::SegmentCache;
20use vortex_layout::segments::SegmentCacheSourceAdapter;
21use vortex_layout::segments::SegmentId;
22use vortex_layout::segments::SharedSegmentSource;
23use vortex_layout::session::LayoutSessionExt;
24use vortex_metrics::DefaultMetricsRegistry;
25use vortex_metrics::Label;
26use vortex_metrics::MetricsRegistry;
27use vortex_session::VortexSession;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::DeserializeStep;
31use crate::EOF_SIZE;
32use crate::MAX_POSTSCRIPT_SIZE;
33use crate::VortexFile;
34use crate::footer::Footer;
35use crate::segments::FileSegmentSource;
36use crate::segments::InitialReadSegmentCache;
37use crate::segments::RequestMetrics;
38
39const INITIAL_READ_SIZE: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
40
41pub struct VortexOpenOptions {
43 session: VortexSession,
45 segment_cache: Option<Arc<dyn SegmentCache>>,
47 initial_read_size: usize,
49 file_size: Option<u64>,
51 dtype: Option<DType>,
53 footer: Option<Footer>,
55 initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
57 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
59 labels: Vec<Label>,
61}
62
63pub trait OpenOptionsSessionExt: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt {
64 fn open_options(&self) -> VortexOpenOptions {
66 VortexOpenOptions {
67 session: self.session(),
68 segment_cache: None,
69 initial_read_size: INITIAL_READ_SIZE,
70 file_size: None,
71 dtype: None,
72 footer: None,
73 initial_read_segments: Default::default(),
74 metrics_registry: None,
75 labels: Vec::default(),
76 }
77 }
78}
79impl<S: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt> OpenOptionsSessionExt for S {}
80
81impl VortexOpenOptions {
82 pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
84 self.initial_read_size = initial_read_size;
85 self
86 }
87
88 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
90 self.segment_cache = Some(segment_cache);
91 self
92 }
93
94 pub fn with_file_size(mut self, file_size: u64) -> Self {
99 self.file_size = Some(file_size);
100 self
101 }
102
103 pub fn with_some_file_size(mut self, file_size: Option<u64>) -> Self {
108 self.file_size = file_size;
109 self
110 }
111
112 pub fn with_dtype(mut self, dtype: DType) -> Self {
118 self.dtype = Some(dtype);
119 self
120 }
121
122 pub fn with_footer(mut self, footer: Footer) -> Self {
127 self.dtype = Some(footer.layout().dtype().clone());
128 self.footer = Some(footer);
129 self
130 }
131
132 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
134 self.metrics_registry = Some(metrics);
135 self
136 }
137
138 pub fn with_labels(mut self, labels: Vec<Label>) -> Self {
140 self.labels.extend(labels);
141 self
142 }
143
144 pub async fn open(self, source: Arc<dyn VortexReadAt>) -> VortexResult<VortexFile> {
151 self.open_read(source).await
152 }
153
154 #[cfg(not(target_arch = "wasm32"))]
156 pub async fn open_path(self, path: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
157 use vortex_io::std_file::FileReadAt;
158 let handle = self.session.handle();
159 let source = Arc::new(FileReadAt::open(path, handle)?);
160 self.open(source).await
161 }
162
163 pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
165 block_on(self.with_initial_read_size(0).open_read(buffer.into()))
167 }
168
169 pub async fn open_read<R: VortexReadAt + Clone>(self, reader: R) -> VortexResult<VortexFile> {
171 let segment_cache = self
172 .segment_cache
173 .clone()
174 .unwrap_or_else(|| Arc::new(NoOpSegmentCache));
175
176 let metrics_registry = self
177 .metrics_registry
178 .clone()
179 .unwrap_or_else(|| Arc::new(DefaultMetricsRegistry::default()));
180
181 let footer = if let Some(footer) = self.footer {
182 footer
183 } else {
184 self.read_footer(&reader).await?
185 };
186
187 let segment_cache = Arc::new(InstrumentedSegmentCache::new(
188 InitialReadSegmentCache {
189 initial: self.initial_read_segments,
190 fallback: segment_cache,
191 },
192 metrics_registry.as_ref(),
193 self.labels.clone(),
194 ));
195
196 let metrics = RequestMetrics::new(metrics_registry.as_ref(), self.labels);
197
198 let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::open(
200 footer.segment_map().clone(),
201 reader,
202 self.session.handle(),
203 metrics,
204 )));
205
206 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
208 segment_cache,
209 segment_source,
210 ));
211
212 Ok(VortexFile {
213 footer,
214 segment_source,
215 session: self.session.clone(),
216 })
217 }
218
219 async fn read_footer(&self, read: &dyn VortexReadAt) -> VortexResult<Footer> {
220 let file_size = match self.file_size {
222 None => read.size().await?,
223 Some(file_size) => file_size,
224 };
225 let mut initial_read_size = self
226 .initial_read_size
227 .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
229 if let Ok(file_size) = usize::try_from(file_size) {
230 initial_read_size = initial_read_size.min(file_size);
231 }
232
233 let initial_offset = file_size - initial_read_size as u64;
234 let initial_read: ByteBuffer = read
235 .read_at(initial_offset, initial_read_size, Alignment::none())
236 .await?
237 .try_into_host()?
238 .await?;
239
240 let mut deserializer = Footer::deserializer(initial_read, self.session.clone())
241 .with_size(file_size)
242 .with_some_dtype(self.dtype.clone());
243
244 let footer = loop {
245 match deserializer.deserialize()? {
246 DeserializeStep::NeedMoreData { offset, len } => {
247 let more_data = read
248 .read_at(offset, len, Alignment::none())
249 .await?
250 .try_into_host()?
251 .await?;
252 deserializer.prefix_data(more_data);
253 }
254 DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
255 DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
256 }
257 }?;
258
259 let initial_offset = file_size - (deserializer.buffer().len() as u64);
262 self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
263
264 Ok(footer)
265 }
266
267 fn populate_initial_segments(
269 &self,
270 initial_offset: u64,
271 initial_read: &ByteBuffer,
272 footer: &Footer,
273 ) {
274 let first_idx = footer
275 .segment_map()
276 .partition_point(|segment| segment.offset < initial_offset);
277
278 let mut initial_read_segments = self.initial_read_segments.write();
279
280 for idx in first_idx..footer.segment_map().len() {
281 let segment = &footer.segment_map()[idx];
282 let segment_id =
283 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
284 let offset =
285 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
286 let buffer = initial_read
287 .slice(offset..offset + (segment.length as usize))
288 .aligned(segment.alignment);
289 initial_read_segments.insert(segment_id, buffer);
290 }
291 }
292}
293
294#[cfg(feature = "object_store")]
295impl VortexOpenOptions {
296 pub async fn open_object_store(
297 self,
298 object_store: &Arc<dyn object_store::ObjectStore>,
299 path: &str,
300 ) -> VortexResult<VortexFile> {
301 use vortex_io::object_store::ObjectStoreReadAt;
302
303 let handle = self.session.handle();
304 let source = Arc::new(ObjectStoreReadAt::new(
305 object_store.clone(),
306 path.into(),
307 handle,
308 ));
309 self.open(source).await
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use std::sync::atomic::AtomicUsize;
316 use std::sync::atomic::Ordering;
317
318 use futures::future::BoxFuture;
319 use vortex_array::IntoArray;
320 use vortex_array::buffer::BufferHandle;
321 use vortex_array::expr::session::ExprSession;
322 use vortex_array::session::ArraySession;
323 use vortex_buffer::Buffer;
324 use vortex_buffer::ByteBufferMut;
325 use vortex_dtype::session::DTypeSession;
326 use vortex_io::session::RuntimeSession;
327 use vortex_layout::session::LayoutSession;
328
329 use super::*;
330 use crate::WriteOptionsSessionExt;
331
332 #[derive(Clone)]
333 struct CountingRead<R> {
335 inner: R,
336 total_read: Arc<AtomicUsize>,
337 first_read_len: Arc<AtomicUsize>,
338 }
339
340 impl<R: VortexReadAt + Clone> VortexReadAt for CountingRead<R> {
341 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
342 self.inner.size()
343 }
344
345 fn read_at(
346 &self,
347 offset: u64,
348 length: usize,
349 alignment: Alignment,
350 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
351 self.total_read.fetch_add(length, Ordering::Relaxed);
352 let _ = self.first_read_len.compare_exchange(
353 0,
354 length,
355 Ordering::Relaxed,
356 Ordering::Relaxed,
357 );
358 self.inner.read_at(offset, length, alignment)
359 }
360
361 fn concurrency(&self) -> usize {
362 self.inner.concurrency()
363 }
364 }
365
366 #[tokio::test]
367 async fn test_initial_read_size() {
368 let mut buf = ByteBufferMut::empty();
370 let mut session = VortexSession::empty()
371 .with::<DTypeSession>()
372 .with::<ArraySession>()
373 .with::<LayoutSession>()
374 .with::<ExprSession>()
375 .with::<RuntimeSession>();
376
377 crate::register_default_encodings(&mut session);
378
379 let array = Buffer::from(
381 (0i32..1_500_000)
382 .map(|i| if i % 2 == 0 { i } else { -i })
383 .collect::<Vec<i32>>(),
384 )
385 .into_array();
386
387 session
388 .write_options()
389 .write(&mut buf, array.to_array_stream())
390 .await
391 .unwrap();
392
393 let buffer = ByteBuffer::from(buf);
394 assert!(
395 buffer.len() > 1024 * 1024,
396 "Buffer length is only {} bytes",
397 buffer.len()
398 );
399
400 let total_read = Arc::new(AtomicUsize::new(0));
401 let first_read_len = Arc::new(AtomicUsize::new(0));
402 let reader = CountingRead {
403 inner: buffer,
404 total_read: total_read.clone(),
405 first_read_len: first_read_len.clone(),
406 };
407
408 let _file = session.open_options().open_read(reader).await.unwrap();
410
411 let first = first_read_len.load(Ordering::Relaxed);
413 assert_eq!(
414 first,
415 MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE,
416 "Read exactly the postscript size"
417 );
418 let read = total_read.load(Ordering::Relaxed);
419 assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read);
420 }
421}