1use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::ArraySessionExt;
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_dtype::DType;
11use vortex_error::{VortexError, VortexExpect, VortexResult};
12use vortex_io::file::IntoReadSource;
13use vortex_io::session::RuntimeSessionExt;
14use vortex_io::{InstrumentedReadAt, VortexReadAt};
15use vortex_layout::segments::{
16 NoOpSegmentCache, SegmentCache, SegmentCacheMetrics, SegmentCacheSourceAdapter, SegmentId,
17 SharedSegmentSource,
18};
19use vortex_layout::session::LayoutSessionExt;
20use vortex_metrics::{MetricsSessionExt, VortexMetrics};
21use vortex_session::VortexSession;
22use vortex_utils::aliases::hash_map::HashMap;
23
24use crate::footer::Footer;
25use crate::segments::{FileSegmentSource, InitialReadSegmentCache};
26use crate::{DeserializeStep, EOF_SIZE, MAX_POSTSCRIPT_SIZE, VortexFile};
27
28const INITIAL_READ_SIZE: usize = 1 << 20; pub struct VortexOpenOptions {
32 session: VortexSession,
34 segment_cache: Arc<dyn SegmentCache>,
36 initial_read_size: usize,
38 file_size: Option<u64>,
40 dtype: Option<DType>,
42 footer: Option<Footer>,
44 initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
46 metrics: VortexMetrics,
48}
49
50pub trait OpenOptionsSessionExt:
51 ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt
52{
53 fn open_options(&self) -> VortexOpenOptions {
55 VortexOpenOptions {
56 session: self.session(),
57 segment_cache: Arc::new(NoOpSegmentCache),
58 initial_read_size: INITIAL_READ_SIZE,
59 file_size: None,
60 dtype: None,
61 footer: None,
62 initial_read_segments: Default::default(),
63 metrics: self.metrics(),
64 }
65 }
66}
67impl<S: ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt>
68 OpenOptionsSessionExt for S
69{
70}
71
72impl VortexOpenOptions {
73 pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
75 self.initial_read_size = initial_read_size;
76 self
77 }
78
79 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
81 self.segment_cache = segment_cache;
82 self
83 }
84
85 pub fn without_segment_cache(self) -> Self {
87 self.with_segment_cache(Arc::new(NoOpSegmentCache))
88 }
89
90 pub fn with_file_size(mut self, file_size: u64) -> Self {
95 self.file_size = Some(file_size);
96 self
97 }
98
99 pub fn with_dtype(mut self, dtype: DType) -> Self {
105 self.dtype = Some(dtype);
106 self
107 }
108
109 pub fn with_footer(mut self, footer: Footer) -> Self {
114 self.dtype = Some(footer.layout().dtype().clone());
115 self.footer = Some(footer);
116 self
117 }
118
119 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
121 self.metrics = metrics;
122 self
123 }
124
125 pub async fn open<S: IntoReadSource>(self, source: S) -> VortexResult<VortexFile> {
132 let handle = self.session.handle();
133 let metrics = self.metrics.clone();
134 self.open_read_at(handle.open_read(source, metrics)?).await
135 }
136
137 pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
139 block_on(
141 self.with_initial_read_size(0)
142 .without_segment_cache()
143 .open_read_at(buffer.into()),
144 )
145 }
146
147 pub async fn open_read_at<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile> {
151 let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics));
152
153 let footer = if let Some(footer) = self.footer {
154 footer
155 } else {
156 self.read_footer(read.clone()).await?
157 };
158
159 let segment_cache = Arc::new(SegmentCacheMetrics::new(
160 InitialReadSegmentCache {
161 initial: self.initial_read_segments,
162 fallback: self.segment_cache,
163 },
164 self.metrics.clone(),
165 ));
166
167 let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::new(
169 footer.segment_map().clone(),
170 read,
171 )));
172
173 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
175 segment_cache,
176 segment_source,
177 ));
178
179 Ok(VortexFile {
180 footer,
181 segment_source,
182 metrics: self.metrics,
183 session: self.session.clone(),
184 })
185 }
186
187 async fn read_footer(&self, read: Arc<dyn VortexReadAt>) -> VortexResult<Footer> {
188 let file_size = match self.file_size {
190 None => read.size().await?,
191 Some(file_size) => file_size,
192 };
193 let mut initial_read_size = self
194 .initial_read_size
195 .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
197 if let Ok(file_size) = usize::try_from(file_size) {
198 initial_read_size = initial_read_size.min(file_size);
199 }
200
201 let initial_offset = file_size - initial_read_size as u64;
202 let initial_read: ByteBuffer = read
203 .clone()
204 .read_at(initial_offset, initial_read_size, Alignment::none())
205 .await?;
206
207 let mut deserializer = Footer::deserializer(initial_read, self.session.clone())
208 .with_size(file_size)
209 .with_some_dtype(self.dtype.clone());
210
211 let footer = loop {
212 match deserializer.deserialize()? {
213 DeserializeStep::NeedMoreData { offset, len } => {
214 let more_data = read.clone().read_at(offset, len, Alignment::none()).await?;
215 deserializer.prefix_data(more_data);
216 }
217 DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
218 DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
219 }
220 }?;
221
222 let initial_offset = file_size - (deserializer.buffer().len() as u64);
225 self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
226
227 Ok(footer)
228 }
229
230 fn populate_initial_segments(
232 &self,
233 initial_offset: u64,
234 initial_read: &ByteBuffer,
235 footer: &Footer,
236 ) {
237 let first_idx = footer
238 .segment_map()
239 .partition_point(|segment| segment.offset < initial_offset);
240
241 let mut initial_read_segments = self.initial_read_segments.write();
242
243 for idx in first_idx..footer.segment_map().len() {
244 let segment = &footer.segment_map()[idx];
245 let segment_id =
246 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
247 let offset =
248 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
249 let buffer = initial_read
250 .slice(offset..offset + (segment.length as usize))
251 .aligned(segment.alignment);
252 initial_read_segments.insert(segment_id, buffer);
253 }
254 }
255}
256
257#[cfg(feature = "object_store")]
258impl VortexOpenOptions {
259 pub async fn open_object_store(
260 self,
261 object_store: &Arc<dyn object_store::ObjectStore>,
262 path: &str,
263 ) -> VortexResult<VortexFile> {
264 use vortex_io::file::object_store::ObjectStoreReadSource;
265
266 self.open(ObjectStoreReadSource::new(
267 object_store.clone(),
268 path.into(),
269 ))
270 .await
271 }
272}