1use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::dtype::DType;
9use vortex_array::session::ArraySessionExt;
10use vortex_buffer::Alignment;
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_io::VortexReadAt;
16use vortex_io::session::RuntimeSessionExt;
17use vortex_layout::segments::InstrumentedSegmentCache;
18use vortex_layout::segments::NoOpSegmentCache;
19use vortex_layout::segments::SegmentCache;
20use vortex_layout::segments::SegmentCacheSourceAdapter;
21use vortex_layout::segments::SegmentId;
22use vortex_layout::segments::SharedSegmentSource;
23use vortex_layout::session::LayoutSessionExt;
24use vortex_metrics::DefaultMetricsRegistry;
25use vortex_metrics::Label;
26use vortex_metrics::MetricsRegistry;
27use vortex_session::VortexSession;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::DeserializeStep;
31use crate::EOF_SIZE;
32use crate::MAX_POSTSCRIPT_SIZE;
33use crate::VortexFile;
34use crate::footer::Footer;
35use crate::segments::BufferSegmentSource;
36use crate::segments::FileSegmentSource;
37use crate::segments::InitialReadSegmentCache;
38use crate::segments::RequestMetrics;
39
40const INITIAL_READ_SIZE: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
41
42pub struct VortexOpenOptions {
44 session: VortexSession,
46 segment_cache: Option<Arc<dyn SegmentCache>>,
48 initial_read_size: usize,
50 file_size: Option<u64>,
52 dtype: Option<DType>,
54 footer: Option<Footer>,
56 initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
58 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
60 labels: Vec<Label>,
62}
63
64pub trait OpenOptionsSessionExt: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt {
65 fn open_options(&self) -> VortexOpenOptions {
67 VortexOpenOptions {
68 session: self.session(),
69 segment_cache: None,
70 initial_read_size: INITIAL_READ_SIZE,
71 file_size: None,
72 dtype: None,
73 footer: None,
74 initial_read_segments: Default::default(),
75 metrics_registry: None,
76 labels: Vec::default(),
77 }
78 }
79}
80impl<S: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt> OpenOptionsSessionExt for S {}
81
82impl VortexOpenOptions {
83 pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
85 self.initial_read_size = initial_read_size;
86 self
87 }
88
89 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
91 self.segment_cache = Some(segment_cache);
92 self
93 }
94
95 pub fn with_file_size(mut self, file_size: u64) -> Self {
100 self.file_size = Some(file_size);
101 self
102 }
103
104 pub fn with_some_file_size(mut self, file_size: Option<u64>) -> Self {
109 self.file_size = file_size;
110 self
111 }
112
113 pub fn with_dtype(mut self, dtype: DType) -> Self {
119 self.dtype = Some(dtype);
120 self
121 }
122
123 pub fn with_footer(mut self, footer: Footer) -> Self {
128 self.dtype = Some(footer.layout().dtype().clone());
129 self.footer = Some(footer);
130 self
131 }
132
133 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
135 self.metrics_registry = Some(metrics);
136 self
137 }
138
139 pub fn with_labels(mut self, labels: Vec<Label>) -> Self {
141 self.labels.extend(labels);
142 self
143 }
144
145 pub async fn open(self, source: Arc<dyn VortexReadAt>) -> VortexResult<VortexFile> {
152 self.open_read(source).await
153 }
154
155 #[cfg(not(target_arch = "wasm32"))]
157 pub async fn open_path(self, path: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
158 use vortex_io::std_file::FileReadAt;
159 let handle = self.session.handle();
160 let source = Arc::new(FileReadAt::open(path, handle)?);
161 self.open(source).await
162 }
163
164 pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
169 let buffer: ByteBuffer = buffer.into();
170
171 if self.segment_cache.is_some() {
172 tracing::warn!("segment cache is ignored for in-memory `open_buffer`");
173 }
174 if self.metrics_registry.is_some() {
175 tracing::warn!("metrics registry is ignored for in-memory `open_buffer`");
176 }
177
178 let mut opts = self.with_initial_read_size(0);
179
180 let footer = match opts.footer.take() {
181 Some(footer) => footer,
182 None => block_on(opts.read_footer(&buffer))?,
183 };
184
185 let segment_source = Arc::new(BufferSegmentSource::new(
186 buffer,
187 footer.segment_map().clone(),
188 ));
189
190 Ok(VortexFile {
191 footer,
192 segment_source,
193 session: opts.session,
194 })
195 }
196
197 pub async fn open_read<R: VortexReadAt + Clone>(self, reader: R) -> VortexResult<VortexFile> {
199 let segment_cache = self
200 .segment_cache
201 .clone()
202 .unwrap_or_else(|| Arc::new(NoOpSegmentCache));
203
204 let metrics_registry = self
205 .metrics_registry
206 .clone()
207 .unwrap_or_else(|| Arc::new(DefaultMetricsRegistry::default()));
208
209 let footer = if let Some(footer) = self.footer {
210 footer
211 } else {
212 self.read_footer(&reader).await?
213 };
214
215 let segment_cache = Arc::new(InstrumentedSegmentCache::new(
216 InitialReadSegmentCache {
217 initial: self.initial_read_segments,
218 fallback: segment_cache,
219 },
220 metrics_registry.as_ref(),
221 self.labels.clone(),
222 ));
223
224 let metrics = RequestMetrics::new(metrics_registry.as_ref(), self.labels);
225
226 let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::open(
228 footer.segment_map().clone(),
229 reader,
230 self.session.handle(),
231 metrics,
232 )));
233
234 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
236 segment_cache,
237 segment_source,
238 ));
239
240 Ok(VortexFile {
241 footer,
242 segment_source,
243 session: self.session.clone(),
244 })
245 }
246
247 async fn read_footer(&self, read: &dyn VortexReadAt) -> VortexResult<Footer> {
248 let file_size = match self.file_size {
250 None => read.size().await?,
251 Some(file_size) => file_size,
252 };
253 let mut initial_read_size = self
254 .initial_read_size
255 .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
257 if let Ok(file_size) = usize::try_from(file_size) {
258 initial_read_size = initial_read_size.min(file_size);
259 }
260
261 let initial_offset = file_size - initial_read_size as u64;
262 let initial_read: ByteBuffer = read
263 .read_at(initial_offset, initial_read_size, Alignment::none())
264 .await?
265 .try_into_host()?
266 .await?;
267
268 let mut deserializer = Footer::deserializer(initial_read, self.session.clone())
269 .with_size(file_size)
270 .with_some_dtype(self.dtype.clone());
271
272 let footer = loop {
273 match deserializer.deserialize()? {
274 DeserializeStep::NeedMoreData { offset, len } => {
275 let more_data = read
276 .read_at(offset, len, Alignment::none())
277 .await?
278 .try_into_host()?
279 .await?;
280 deserializer.prefix_data(more_data);
281 }
282 DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
283 DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
284 }
285 }?;
286
287 let initial_offset = file_size - (deserializer.buffer().len() as u64);
290 self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
291
292 Ok(footer)
293 }
294
295 fn populate_initial_segments(
297 &self,
298 initial_offset: u64,
299 initial_read: &ByteBuffer,
300 footer: &Footer,
301 ) {
302 let first_idx = footer
303 .segment_map()
304 .partition_point(|segment| segment.offset < initial_offset);
305
306 let mut initial_read_segments = self.initial_read_segments.write();
307
308 for idx in first_idx..footer.segment_map().len() {
309 let segment = &footer.segment_map()[idx];
310 let segment_id =
311 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
312 let offset =
313 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
314 let buffer = initial_read
315 .slice(offset..offset + (segment.length as usize))
316 .aligned(segment.alignment);
317 initial_read_segments.insert(segment_id, buffer);
318 }
319 }
320}
321
322#[cfg(feature = "object_store")]
323impl VortexOpenOptions {
324 pub async fn open_object_store(
325 self,
326 object_store: &Arc<dyn object_store::ObjectStore>,
327 path: &str,
328 ) -> VortexResult<VortexFile> {
329 use vortex_io::object_store::ObjectStoreReadAt;
330
331 let handle = self.session.handle();
332 let source = Arc::new(ObjectStoreReadAt::new(
333 object_store.clone(),
334 path.into(),
335 handle,
336 ));
337 self.open(source).await
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use std::sync::atomic::AtomicUsize;
344 use std::sync::atomic::Ordering;
345
346 use futures::future::BoxFuture;
347 use vortex_array::IntoArray;
348 use vortex_array::buffer::BufferHandle;
349 use vortex_array::dtype::session::DTypeSession;
350 use vortex_array::scalar_fn::session::ScalarFnSession;
351 use vortex_array::session::ArraySession;
352 use vortex_buffer::Buffer;
353 use vortex_buffer::ByteBufferMut;
354 use vortex_io::session::RuntimeSession;
355 use vortex_layout::session::LayoutSession;
356
357 use super::*;
358 use crate::WriteOptionsSessionExt;
359
360 #[derive(Clone)]
361 struct CountingRead<R> {
363 inner: R,
364 total_read: Arc<AtomicUsize>,
365 first_read_len: Arc<AtomicUsize>,
366 }
367
368 impl<R: VortexReadAt + Clone> VortexReadAt for CountingRead<R> {
369 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
370 self.inner.size()
371 }
372
373 fn read_at(
374 &self,
375 offset: u64,
376 length: usize,
377 alignment: Alignment,
378 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
379 self.total_read.fetch_add(length, Ordering::Relaxed);
380 let _ = self.first_read_len.compare_exchange(
381 0,
382 length,
383 Ordering::Relaxed,
384 Ordering::Relaxed,
385 );
386 self.inner.read_at(offset, length, alignment)
387 }
388
389 fn concurrency(&self) -> usize {
390 self.inner.concurrency()
391 }
392 }
393
394 #[tokio::test]
395 async fn test_initial_read_size() {
396 let mut buf = ByteBufferMut::empty();
398 let mut session = VortexSession::empty()
399 .with::<DTypeSession>()
400 .with::<ArraySession>()
401 .with::<LayoutSession>()
402 .with::<ScalarFnSession>()
403 .with::<RuntimeSession>();
404
405 crate::register_default_encodings(&mut session);
406
407 let array = Buffer::from(
409 (0i32..1_500_000)
410 .map(|i| if i % 2 == 0 { i } else { -i })
411 .collect::<Vec<i32>>(),
412 )
413 .into_array();
414
415 session
416 .write_options()
417 .write(&mut buf, array.to_array_stream())
418 .await
419 .unwrap();
420
421 let buffer = ByteBuffer::from(buf);
422 assert!(
423 buffer.len() > 1024 * 1024,
424 "Buffer length is only {} bytes",
425 buffer.len()
426 );
427
428 let total_read = Arc::new(AtomicUsize::new(0));
429 let first_read_len = Arc::new(AtomicUsize::new(0));
430 let reader = CountingRead {
431 inner: buffer,
432 total_read: total_read.clone(),
433 first_read_len: first_read_len.clone(),
434 };
435
436 let _file = session.open_options().open_read(reader).await.unwrap();
438
439 let first = first_read_len.load(Ordering::Relaxed);
441 assert_eq!(
442 first,
443 MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE,
444 "Read exactly the postscript size"
445 );
446 let read = total_read.load(Ordering::Relaxed);
447 assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read);
448 }
449}