1use std::sync::{Arc, RwLock};
2
3use flatbuffers::root;
4use vortex_array::ArrayRegistry;
5use vortex_array::aliases::hash_map::HashMap;
6use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
7use vortex_dtype::DType;
8use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
9use vortex_flatbuffers::{FlatBuffer, ReadFlatBuffer, dtype as fbd};
10use vortex_io::VortexReadAt;
11use vortex_layout::segments::SegmentId;
12use vortex_layout::{LayoutRegistry, LayoutRegistryExt};
13use vortex_metrics::VortexMetrics;
14
15use crate::footer::{FileStatistics, Footer, Postscript, PostscriptSegment};
16use crate::segments::{NoOpSegmentCache, SegmentCache};
17use crate::{DEFAULT_REGISTRY, EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
18
19pub trait FileType: Sized {
20 type Options;
21}
22
23pub struct VortexOpenOptions<F: FileType> {
25 pub(crate) options: F::Options,
27 pub(crate) registry: Arc<ArrayRegistry>,
29 pub(crate) layout_registry: Arc<LayoutRegistry>,
31 pub(crate) file_size: Option<u64>,
33 pub(crate) dtype: Option<DType>,
35 footer: Option<Footer>,
38 pub(crate) segment_cache: Arc<dyn SegmentCache>,
39 pub(crate) initial_read_size: u64,
40 pub(crate) initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
41 pub(crate) metrics: VortexMetrics,
42}
43
44impl<F: FileType> VortexOpenOptions<F> {
45 pub(crate) fn new(options: F::Options) -> Self {
46 Self {
47 options,
48 registry: DEFAULT_REGISTRY.clone(),
49 layout_registry: Arc::new(LayoutRegistry::default()),
50 file_size: None,
51 dtype: None,
52 footer: None,
53 segment_cache: Arc::new(NoOpSegmentCache),
54 initial_read_size: 0,
55 initial_read_segments: Default::default(),
56 metrics: VortexMetrics::default(),
57 }
58 }
59
60 pub fn with_array_registry(mut self, registry: Arc<ArrayRegistry>) -> Self {
62 self.registry = registry;
63 self
64 }
65
66 pub fn with_layout_registry(mut self, registry: Arc<LayoutRegistry>) -> Self {
68 self.layout_registry = registry;
69 self
70 }
71
72 pub fn with_file_size(mut self, file_size: u64) -> Self {
77 self.file_size = Some(file_size);
78 self
79 }
80
81 pub fn with_dtype(mut self, dtype: DType) -> Self {
87 self.dtype = Some(dtype);
88 self
89 }
90
91 pub fn with_footer(mut self, footer: Footer) -> Self {
96 self.dtype = Some(footer.layout().dtype().clone());
97 self.footer = Some(footer);
98 self
99 }
100
101 pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
103 self.initial_read_size = initial_read_size;
104 self
105 }
106
107 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
109 self.segment_cache = segment_cache;
110 self
111 }
112
113 pub fn without_segment_cache(self) -> Self {
115 self.with_segment_cache(Arc::new(NoOpSegmentCache))
116 }
117
118 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
120 self.metrics = metrics;
121 self
122 }
123}
124
125impl<F: FileType> VortexOpenOptions<F> {
126 pub(crate) async fn read_footer<R: VortexReadAt>(&self, read: &R) -> VortexResult<Footer> {
128 if let Some(footer) = self.footer.as_ref() {
129 return Ok(footer.clone());
130 }
131
132 let file_size = match self.file_size {
134 None => read.size().await?,
135 Some(file_size) => file_size,
136 };
137 let initial_read_size = self
138 .initial_read_size
139 .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
141 .min(file_size);
142 let mut initial_offset = file_size - initial_read_size;
143 let mut initial_read: ByteBuffer = read
144 .read_byte_range(initial_offset..file_size, Alignment::none())
145 .await?;
146
147 let postscript = self.parse_postscript(&initial_read)?;
149
150 let dtype_segment = self.dtype.is_none().then(|| postscript.dtype.ok_or_else(|| vortex_err!("Vortex file doesn't embed a DType and one has not been provided to VortexOpenOptions"))).transpose()?;
152
153 let mut read_more_offset = initial_offset;
156 if let Some(dtype_segment) = &dtype_segment {
157 read_more_offset = read_more_offset.min(dtype_segment.offset);
158 }
159 if let Some(stats_segment) = &postscript.statistics {
160 read_more_offset = read_more_offset.min(stats_segment.offset);
161 }
162 read_more_offset = read_more_offset.min(postscript.layout.offset);
163
164 if read_more_offset < initial_offset {
166 log::info!(
167 "Initial read from {} did not cover all footer segments, reading from {}",
168 initial_offset,
169 read_more_offset
170 );
171
172 let mut new_initial_read =
173 ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
174 new_initial_read.extend_from_slice(
175 &read
176 .read_byte_range(read_more_offset..initial_offset, Alignment::none())
177 .await?,
178 );
179 new_initial_read.extend_from_slice(&initial_read);
180
181 initial_offset = read_more_offset;
182 initial_read = new_initial_read.freeze();
183 }
184
185 let dtype = dtype_segment
187 .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
188 .transpose()?
189 .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
190 let file_stats = postscript
191 .statistics
192 .map(|segment| {
193 self.parse_flatbuffer::<FileStatistics>(initial_offset, &initial_read, &segment)
194 })
195 .transpose()?;
196 let footer = self.parse_file_layout(
197 initial_offset,
198 &initial_read,
199 &postscript.layout,
200 dtype,
201 file_stats,
202 )?;
203
204 self.populate_initial_segments(initial_offset, &initial_read, &footer);
207
208 Ok(footer)
209 }
210
211 fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
213 let eof_loc = initial_read.len() - EOF_SIZE;
214 let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
215
216 let magic_number = &initial_read[magic_bytes_loc..];
217 if magic_number != MAGIC_BYTES {
218 vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
219 }
220
221 let version = u16::from_le_bytes(
222 initial_read[eof_loc..eof_loc + 2]
223 .try_into()
224 .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
225 );
226 if version != VERSION {
227 vortex_bail!("Malformed file, unsupported version {version}")
228 }
229
230 let ps_size = u16::from_le_bytes(
231 initial_read[eof_loc + 2..eof_loc + 4]
232 .try_into()
233 .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
234 ) as usize;
235
236 Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
237 }
238
239 fn parse_dtype(
241 &self,
242 initial_offset: u64,
243 initial_read: &ByteBuffer,
244 segment: &PostscriptSegment,
245 ) -> VortexResult<DType> {
246 let offset = usize::try_from(segment.offset - initial_offset)?;
247 let sliced_buffer =
248 FlatBuffer::align_from(initial_read.slice(offset..offset + (segment.length as usize)));
249 let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;
250
251 DType::try_from_view(fbd_dtype, sliced_buffer.clone())
252 }
253
254 fn parse_flatbuffer<T: ReadFlatBuffer<Error = VortexError>>(
256 &self,
257 initial_offset: u64,
258 initial_read: &ByteBuffer,
259 segment: &PostscriptSegment,
260 ) -> VortexResult<T> {
261 let offset = usize::try_from(segment.offset - initial_offset)?;
262 let sliced_buffer =
263 FlatBuffer::align_from(initial_read.slice(offset..offset + (segment.length as usize)));
264 T::read_flatbuffer_bytes(&sliced_buffer)
265 }
266
267 fn parse_file_layout(
269 &self,
270 initial_offset: u64,
271 initial_read: &ByteBuffer,
272 layout_segment: &PostscriptSegment,
273 dtype: DType,
274 file_stats: Option<FileStatistics>,
275 ) -> VortexResult<Footer> {
276 let offset = usize::try_from(layout_segment.offset - initial_offset)?;
277 let bytes = FlatBuffer::align_from(
278 initial_read.slice(offset..offset + (layout_segment.length as usize)),
279 );
280 Footer::from_flatbuffer(
281 bytes,
282 dtype,
283 file_stats,
284 &self.registry,
285 &self.layout_registry,
286 )
287 }
288
289 fn populate_initial_segments(
291 &self,
292 initial_offset: u64,
293 initial_read: &ByteBuffer,
294 footer: &Footer,
295 ) {
296 let first_idx = footer
297 .segment_map()
298 .partition_point(|segment| segment.offset < initial_offset);
299
300 let mut initial_segments = self
301 .initial_read_segments
302 .write()
303 .vortex_expect("poisoned lock");
304
305 for idx in first_idx..footer.segment_map().len() {
306 let segment = &footer.segment_map()[idx];
307 let segment_id =
308 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
309 let offset =
310 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
311 let buffer = initial_read
312 .slice(offset..offset + (segment.length as usize))
313 .aligned(segment.alignment);
314 initial_segments.insert(segment_id, buffer);
315 }
316 }
317}