1use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::session::ArraySessionExt;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_dtype::DType;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_io::InstrumentedReadAt;
16use vortex_io::VortexReadAt;
17use vortex_io::file::IntoReadSource;
18use vortex_io::session::RuntimeSessionExt;
19use vortex_layout::segments::NoOpSegmentCache;
20use vortex_layout::segments::SegmentCache;
21use vortex_layout::segments::SegmentCacheMetrics;
22use vortex_layout::segments::SegmentCacheSourceAdapter;
23use vortex_layout::segments::SegmentId;
24use vortex_layout::segments::SharedSegmentSource;
25use vortex_layout::session::LayoutSessionExt;
26use vortex_metrics::MetricsSessionExt;
27use vortex_metrics::VortexMetrics;
28use vortex_session::VortexSession;
29use vortex_utils::aliases::hash_map::HashMap;
30
31use crate::DeserializeStep;
32use crate::EOF_SIZE;
33use crate::MAX_POSTSCRIPT_SIZE;
34use crate::VortexFile;
35use crate::footer::Footer;
36use crate::segments::FileSegmentSource;
37use crate::segments::InitialReadSegmentCache;
38
39const INITIAL_READ_SIZE: usize = 1 << 20; pub struct VortexOpenOptions {
43 session: VortexSession,
45 segment_cache: Arc<dyn SegmentCache>,
47 initial_read_size: usize,
49 file_size: Option<u64>,
51 dtype: Option<DType>,
53 footer: Option<Footer>,
55 initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
57 metrics: VortexMetrics,
59}
60
61pub trait OpenOptionsSessionExt:
62 ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt
63{
64 fn open_options(&self) -> VortexOpenOptions {
66 VortexOpenOptions {
67 session: self.session(),
68 segment_cache: Arc::new(NoOpSegmentCache),
69 initial_read_size: INITIAL_READ_SIZE,
70 file_size: None,
71 dtype: None,
72 footer: None,
73 initial_read_segments: Default::default(),
74 metrics: self.metrics(),
75 }
76 }
77}
78impl<S: ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt>
79 OpenOptionsSessionExt for S
80{
81}
82
83impl VortexOpenOptions {
84 pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
86 self.initial_read_size = initial_read_size;
87 self
88 }
89
90 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
92 self.segment_cache = segment_cache;
93 self
94 }
95
96 pub fn without_segment_cache(self) -> Self {
98 self.with_segment_cache(Arc::new(NoOpSegmentCache))
99 }
100
101 pub fn with_file_size(mut self, file_size: u64) -> Self {
106 self.file_size = Some(file_size);
107 self
108 }
109
110 pub fn with_dtype(mut self, dtype: DType) -> Self {
116 self.dtype = Some(dtype);
117 self
118 }
119
120 pub fn with_footer(mut self, footer: Footer) -> Self {
125 self.dtype = Some(footer.layout().dtype().clone());
126 self.footer = Some(footer);
127 self
128 }
129
130 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
132 self.metrics = metrics;
133 self
134 }
135
136 pub async fn open<S: IntoReadSource>(self, source: S) -> VortexResult<VortexFile> {
143 let handle = self.session.handle();
144 let metrics = self.metrics.clone();
145 self.open_read_at(handle.open_read(source, metrics)?).await
146 }
147
148 pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
150 block_on(
152 self.with_initial_read_size(0)
153 .without_segment_cache()
154 .open_read_at(buffer.into()),
155 )
156 }
157
158 pub async fn open_read_at<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile> {
162 let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics));
163
164 let footer = if let Some(footer) = self.footer {
165 footer
166 } else {
167 self.read_footer(read.clone()).await?
168 };
169
170 let segment_cache = Arc::new(SegmentCacheMetrics::new(
171 InitialReadSegmentCache {
172 initial: self.initial_read_segments,
173 fallback: self.segment_cache,
174 },
175 self.metrics.clone(),
176 ));
177
178 let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::new(
180 footer.segment_map().clone(),
181 read,
182 )));
183
184 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
186 segment_cache,
187 segment_source,
188 ));
189
190 Ok(VortexFile {
191 footer,
192 segment_source,
193 metrics: self.metrics,
194 session: self.session.clone(),
195 })
196 }
197
198 async fn read_footer(&self, read: Arc<dyn VortexReadAt>) -> VortexResult<Footer> {
199 let file_size = match self.file_size {
201 None => read.size().await?,
202 Some(file_size) => file_size,
203 };
204 let mut initial_read_size = self
205 .initial_read_size
206 .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
208 if let Ok(file_size) = usize::try_from(file_size) {
209 initial_read_size = initial_read_size.min(file_size);
210 }
211
212 let initial_offset = file_size - initial_read_size as u64;
213 let initial_read: ByteBuffer = read
214 .clone()
215 .read_at(initial_offset, initial_read_size, Alignment::none())
216 .await?;
217
218 let mut deserializer = Footer::deserializer(initial_read, self.session.clone())
219 .with_size(file_size)
220 .with_some_dtype(self.dtype.clone());
221
222 let footer = loop {
223 match deserializer.deserialize()? {
224 DeserializeStep::NeedMoreData { offset, len } => {
225 let more_data = read.clone().read_at(offset, len, Alignment::none()).await?;
226 deserializer.prefix_data(more_data);
227 }
228 DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
229 DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
230 }
231 }?;
232
233 let initial_offset = file_size - (deserializer.buffer().len() as u64);
236 self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
237
238 Ok(footer)
239 }
240
241 fn populate_initial_segments(
243 &self,
244 initial_offset: u64,
245 initial_read: &ByteBuffer,
246 footer: &Footer,
247 ) {
248 let first_idx = footer
249 .segment_map()
250 .partition_point(|segment| segment.offset < initial_offset);
251
252 let mut initial_read_segments = self.initial_read_segments.write();
253
254 for idx in first_idx..footer.segment_map().len() {
255 let segment = &footer.segment_map()[idx];
256 let segment_id =
257 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
258 let offset =
259 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
260 let buffer = initial_read
261 .slice(offset..offset + (segment.length as usize))
262 .aligned(segment.alignment);
263 initial_read_segments.insert(segment_id, buffer);
264 }
265 }
266}
267
268#[cfg(feature = "object_store")]
269impl VortexOpenOptions {
270 pub async fn open_object_store(
271 self,
272 object_store: &Arc<dyn object_store::ObjectStore>,
273 path: &str,
274 ) -> VortexResult<VortexFile> {
275 use vortex_io::file::object_store::ObjectStoreReadSource;
276
277 self.open(ObjectStoreReadSource::new(
278 object_store.clone(),
279 path.into(),
280 ))
281 .await
282 }
283}