1use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::ArrayRegistry;
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_dtype::DType;
11use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail};
12use vortex_io::file::IntoReadSource;
13use vortex_io::runtime::Handle;
14use vortex_io::{InstrumentedReadAt, VortexReadAt};
15use vortex_layout::segments::{
16 NoOpSegmentCache, SegmentCache, SegmentCacheMetrics, SegmentCacheSourceAdapter, SegmentId,
17 SharedSegmentSource,
18};
19use vortex_layout::{LayoutRegistry, LayoutRegistryExt};
20use vortex_metrics::VortexMetrics;
21use vortex_utils::aliases::hash_map::HashMap;
22
23use crate::footer::Footer;
24use crate::segments::{FileSegmentSource, InitialReadSegmentCache};
25use crate::{DEFAULT_REGISTRY, DeserializeStep, EOF_SIZE, MAX_POSTSCRIPT_SIZE, VortexFile};
26
27const INITIAL_READ_SIZE: usize = 1 << 20; pub struct VortexOpenOptions {
31 handle: Option<Handle>,
33 segment_cache: Arc<dyn SegmentCache>,
35 initial_read_size: usize,
37 registry: Arc<ArrayRegistry>,
39 layout_registry: Arc<LayoutRegistry>,
41 file_size: Option<u64>,
43 dtype: Option<DType>,
45 footer: Option<Footer>,
47 initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
49 metrics: VortexMetrics,
51}
52
53impl Default for VortexOpenOptions {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl VortexOpenOptions {
60 pub fn new() -> Self {
65 Self {
66 handle: Handle::find(),
67 segment_cache: Arc::new(NoOpSegmentCache),
68 initial_read_size: INITIAL_READ_SIZE,
69 registry: DEFAULT_REGISTRY.clone(),
70 layout_registry: Arc::new(LayoutRegistry::default()),
71 file_size: None,
72 dtype: None,
73 footer: None,
74 initial_read_segments: Default::default(),
75 metrics: VortexMetrics::default(),
76 }
77 }
78
79 pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
81 self.initial_read_size = initial_read_size;
82 self
83 }
84
85 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
87 self.segment_cache = segment_cache;
88 self
89 }
90
91 pub fn without_segment_cache(self) -> Self {
93 self.with_segment_cache(Arc::new(NoOpSegmentCache))
94 }
95
96 pub fn with_handle(mut self, handle: Handle) -> Self {
106 self.handle = Some(handle);
107 self
108 }
109
110 pub fn with_array_registry(mut self, registry: Arc<ArrayRegistry>) -> Self {
112 self.registry = registry;
113 self
114 }
115
116 pub fn with_layout_registry(mut self, registry: Arc<LayoutRegistry>) -> Self {
118 self.layout_registry = registry;
119 self
120 }
121
122 pub fn with_file_size(mut self, file_size: u64) -> Self {
127 self.file_size = Some(file_size);
128 self
129 }
130
131 pub fn with_dtype(mut self, dtype: DType) -> Self {
137 self.dtype = Some(dtype);
138 self
139 }
140
141 pub fn with_footer(mut self, footer: Footer) -> Self {
146 self.dtype = Some(footer.layout().dtype().clone());
147 self.footer = Some(footer);
148 self
149 }
150
151 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
153 self.metrics = metrics;
154 self
155 }
156
157 pub async fn open<S: IntoReadSource>(self, source: S) -> VortexResult<VortexFile> {
164 let Some(handle) = self.handle.clone() else {
165 vortex_bail!("VortexOpenOptions::handle must be set, or else be running inside Tokio");
166 };
167 let metrics = self.metrics.clone();
168 self.open_read_at(handle.open_read(source, metrics)?).await
169 }
170
171 pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
173 block_on(
175 self.with_initial_read_size(0)
176 .without_segment_cache()
177 .open_read_at(buffer.into()),
178 )
179 }
180
181 pub async fn open_read_at<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile> {
185 let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics));
186
187 let footer = if let Some(footer) = self.footer {
188 footer
189 } else {
190 self.read_footer(read.clone()).await?
191 };
192
193 let segment_cache = Arc::new(SegmentCacheMetrics::new(
194 InitialReadSegmentCache {
195 initial: self.initial_read_segments,
196 fallback: self.segment_cache,
197 },
198 self.metrics.clone(),
199 ));
200
201 let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::new(
203 footer.segment_map().clone(),
204 read,
205 )));
206
207 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
209 segment_cache,
210 segment_source,
211 ));
212
213 Ok(VortexFile {
214 footer,
215 segment_source,
216 metrics: self.metrics,
217 })
218 }
219
220 async fn read_footer(&self, read: Arc<dyn VortexReadAt>) -> VortexResult<Footer> {
221 let file_size = match self.file_size {
223 None => read.size().await?,
224 Some(file_size) => file_size,
225 };
226 let mut initial_read_size = self
227 .initial_read_size
228 .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
230 if let Ok(file_size) = usize::try_from(file_size) {
231 initial_read_size = initial_read_size.min(file_size);
232 }
233
234 let initial_offset = file_size - initial_read_size as u64;
235 let initial_read: ByteBuffer = read
236 .clone()
237 .read_at(initial_offset, initial_read_size, Alignment::none())
238 .await?;
239
240 let mut deserializer = Footer::deserializer(initial_read)
241 .with_size(file_size)
242 .with_some_dtype(self.dtype.clone())
243 .with_array_registry(self.registry.clone())
244 .with_layout_registry(self.layout_registry.clone());
245
246 let footer = loop {
247 match deserializer.deserialize()? {
248 DeserializeStep::NeedMoreData { offset, len } => {
249 let more_data = read.clone().read_at(offset, len, Alignment::none()).await?;
250 deserializer.prefix_data(more_data);
251 }
252 DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
253 DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
254 }
255 }?;
256
257 let initial_offset = file_size - (deserializer.buffer().len() as u64);
260 self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
261
262 Ok(footer)
263 }
264
265 fn populate_initial_segments(
267 &self,
268 initial_offset: u64,
269 initial_read: &ByteBuffer,
270 footer: &Footer,
271 ) {
272 let first_idx = footer
273 .segment_map()
274 .partition_point(|segment| segment.offset < initial_offset);
275
276 let mut initial_read_segments = self.initial_read_segments.write();
277
278 for idx in first_idx..footer.segment_map().len() {
279 let segment = &footer.segment_map()[idx];
280 let segment_id =
281 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
282 let offset =
283 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
284 let buffer = initial_read
285 .slice(offset..offset + (segment.length as usize))
286 .aligned(segment.alignment);
287 initial_read_segments.insert(segment_id, buffer);
288 }
289 }
290}
291
292#[cfg(feature = "object_store")]
293impl VortexOpenOptions {
294 pub async fn open_object_store(
295 self,
296 object_store: &Arc<dyn object_store::ObjectStore>,
297 path: &str,
298 ) -> VortexResult<VortexFile> {
299 use vortex_io::file::object_store::ObjectStoreReadSource;
300
301 self.open(ObjectStoreReadSource::new(
302 object_store.clone(),
303 path.into(),
304 ))
305 .await
306 }
307}