1use std::sync::Arc;
2
3use futures::{StreamExt, pin_mut};
4use vortex_error::{VortexExpect, VortexResult};
5use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
6use vortex_layout::segments::{SegmentEvents, SegmentSource};
7use vortex_metrics::VortexMetrics;
8
9use crate::driver::CoalescedDriver;
10use crate::segments::{
11 InitialReadSegmentCache, MokaSegmentCache, SegmentCache, SegmentCacheMetrics,
12 SegmentCacheSourceAdapter,
13};
14use crate::{FileType, SegmentSourceFactory, SegmentSpec, VortexFile, VortexOpenOptions};
15
16pub struct GenericVortexFile;
21
22impl FileType for GenericVortexFile {
23 type Options = GenericFileOptions;
24}
25
26impl VortexOpenOptions<GenericVortexFile> {
27 const INITIAL_READ_SIZE: u64 = 1 << 20; pub fn file() -> Self {
31 Self::new(Default::default())
32 .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
35 .with_initial_read_size(Self::INITIAL_READ_SIZE)
36 }
37
38 pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
39 self.options.io_concurrency = io_concurrency;
40 self
41 }
42
43 pub async fn open<R: VortexReadAt + Send + Sync>(self, read: R) -> VortexResult<VortexFile> {
44 let read = Arc::new(read);
45 let footer = self.read_footer(&read).await?;
46
47 let segment_cache = Arc::new(SegmentCacheMetrics::new(
48 InitialReadSegmentCache {
49 initial: self.initial_read_segments,
50 fallback: self.segment_cache,
51 },
52 self.metrics.clone(),
53 ));
54
55 let segment_source_factory = Arc::new(GenericVortexFileIo {
56 read,
57 segment_map: footer.segment_map().clone(),
58 segment_cache,
59 options: self.options,
60 });
61
62 Ok(VortexFile {
63 footer,
64 segment_source_factory,
65 metrics: self.metrics,
66 })
67 }
68}
69
70struct GenericVortexFileIo<R> {
71 read: Arc<R>,
72 segment_map: Arc<[SegmentSpec]>,
73 segment_cache: Arc<dyn SegmentCache>,
74 options: GenericFileOptions,
75}
76
77impl<R: VortexReadAt + Send + Sync> SegmentSourceFactory for GenericVortexFileIo<R> {
78 fn segment_source(&self, metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
79 let (segment_source, events) = SegmentEvents::create();
81
82 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
84 self.segment_cache.clone(),
85 segment_source,
86 ));
87
88 let read = InstrumentedReadAt::new(self.read.clone(), &metrics);
89
90 let driver = CoalescedDriver::new(
91 read.performance_hint(),
92 self.segment_map.clone(),
93 events,
94 metrics,
95 );
96
97 let io_concurrency = self.options.io_concurrency;
99 self.options
100 .io_dispatcher
101 .dispatch(move || {
102 async move {
103 let stream = driver
105 .map(|coalesced_req| coalesced_req.launch(&read))
106 .buffer_unordered(io_concurrency);
107 pin_mut!(stream);
108
109 stream.collect::<()>().await
111 }
112 })
113 .vortex_expect("Failed to spawn I/O driver");
114
115 segment_source
116 }
117}
118
119#[cfg(feature = "object_store")]
120impl VortexOpenOptions<GenericVortexFile> {
121 pub async fn open_object_store(
122 self,
123 object_store: &Arc<dyn object_store::ObjectStore>,
124 path: &str,
125 ) -> VortexResult<VortexFile> {
126 use std::path::Path;
127
128 use vortex_io::{ObjectStoreReadAt, TokioFile};
129
130 let local_path = Path::new("/").join(path);
135 if local_path.exists() {
136 self.open(TokioFile::open(local_path)?).await
138 } else {
139 self.open(ObjectStoreReadAt::new(
140 object_store.clone(),
141 path.into(),
142 None,
143 ))
144 .await
145 }
146 }
147}
148
149#[derive(Clone)]
150pub struct GenericFileOptions {
151 io_concurrency: usize,
154 io_dispatcher: IoDispatcher,
156}
157
158impl Default for GenericFileOptions {
159 fn default() -> Self {
160 Self {
161 io_concurrency: 8,
162 io_dispatcher: IoDispatcher::shared(),
163 }
164 }
165}