vortex_file/
generic.rs

1use std::sync::Arc;
2
3use futures::{StreamExt, pin_mut};
4use vortex_error::{VortexExpect, VortexResult};
5use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
6use vortex_layout::segments::{SegmentEvents, SegmentSource};
7use vortex_metrics::VortexMetrics;
8
9use crate::driver::CoalescedDriver;
10use crate::segments::{
11    InitialReadSegmentCache, MokaSegmentCache, SegmentCache, SegmentCacheMetrics,
12    SegmentCacheSourceAdapter,
13};
14use crate::{FileType, SegmentSourceFactory, SegmentSpec, VortexFile, VortexOpenOptions};
15
16/// A type of Vortex file that supports any [`VortexReadAt`] implementation.
17///
18/// This is a reasonable choice for files backed by a network since it performs I/O coalescing.
19// TODO(ngates): rename to TokioVortexFile
20pub struct GenericVortexFile;
21
22impl FileType for GenericVortexFile {
23    type Options = GenericFileOptions;
24}
25
26impl VortexOpenOptions<GenericVortexFile> {
27    const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB
28
29    /// Open a file using the provided [`VortexReadAt`] implementation.
30    pub fn file() -> Self {
31        Self::new(Default::default())
32            // Start with an initial in-memory cache of 256MB.
33            // TODO(ngates): would it be better to default to a home directory disk cache?
34            .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
35            .with_initial_read_size(Self::INITIAL_READ_SIZE)
36    }
37
38    pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
39        self.options.io_concurrency = io_concurrency;
40        self
41    }
42
43    pub async fn open<R: VortexReadAt + Send + Sync>(self, read: R) -> VortexResult<VortexFile> {
44        let read = Arc::new(read);
45        let footer = self.read_footer(&read).await?;
46
47        let segment_cache = Arc::new(SegmentCacheMetrics::new(
48            InitialReadSegmentCache {
49                initial: self.initial_read_segments,
50                fallback: self.segment_cache,
51            },
52            self.metrics.clone(),
53        ));
54
55        let segment_source_factory = Arc::new(GenericVortexFileIo {
56            read,
57            segment_map: footer.segment_map().clone(),
58            segment_cache,
59            options: self.options,
60        });
61
62        Ok(VortexFile {
63            footer,
64            segment_source_factory,
65            metrics: self.metrics,
66        })
67    }
68}
69
70struct GenericVortexFileIo<R> {
71    read: Arc<R>,
72    segment_map: Arc<[SegmentSpec]>,
73    segment_cache: Arc<dyn SegmentCache>,
74    options: GenericFileOptions,
75}
76
77impl<R: VortexReadAt + Send + Sync> SegmentSourceFactory for GenericVortexFileIo<R> {
78    fn segment_source(&self, metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
79        // We use segment events for driving I/O.
80        let (segment_source, events) = SegmentEvents::create();
81
82        // Wrap the source to resolve segments from the initial read cache.
83        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
84            self.segment_cache.clone(),
85            segment_source,
86        ));
87
88        let read = InstrumentedReadAt::new(self.read.clone(), &metrics);
89
90        let driver = CoalescedDriver::new(
91            read.performance_hint(),
92            self.segment_map.clone(),
93            events,
94            metrics,
95        );
96
97        // Spawn an I/O driver onto the dispatcher.
98        let io_concurrency = self.options.io_concurrency;
99        self.options
100            .io_dispatcher
101            .dispatch(move || {
102                async move {
103                    // Drive the segment event stream.
104                    let stream = driver
105                        .map(|coalesced_req| coalesced_req.launch(&read))
106                        .buffer_unordered(io_concurrency);
107                    pin_mut!(stream);
108
109                    // Drive the stream to completion.
110                    stream.collect::<()>().await
111                }
112            })
113            .vortex_expect("Failed to spawn I/O driver");
114
115        segment_source
116    }
117}
118
119#[cfg(feature = "object_store")]
120impl VortexOpenOptions<GenericVortexFile> {
121    pub async fn open_object_store(
122        self,
123        object_store: &Arc<dyn object_store::ObjectStore>,
124        path: &str,
125    ) -> VortexResult<VortexFile> {
126        use std::path::Path;
127
128        use vortex_io::{ObjectStoreReadAt, TokioFile};
129
130        // If the file is local, we much prefer to use TokioFile since object store re-opens the
131        // file on every read. This check is a little naive... but we hope that ObjectStore will
132        // soon expose the scheme in a way that we can check more thoroughly.
133        // See: https://github.com/apache/arrow-rs-object-store/issues/259
134        let local_path = Path::new("/").join(path);
135        if local_path.exists() {
136            // Local disk is too fast to justify prefetching.
137            self.open(TokioFile::open(local_path)?).await
138        } else {
139            self.open(ObjectStoreReadAt::new(
140                object_store.clone(),
141                path.into(),
142                None,
143            ))
144            .await
145        }
146    }
147}
148
149#[derive(Clone)]
150pub struct GenericFileOptions {
151    /// The number of concurrent I/O requests to spawn.
152    /// This should be smaller than execution concurrency for coalescing to occur.
153    io_concurrency: usize,
154    /// The dispatcher to use for I/O requests.
155    io_dispatcher: IoDispatcher,
156}
157
158impl Default for GenericFileOptions {
159    fn default() -> Self {
160        Self {
161            io_concurrency: 8,
162            io_dispatcher: IoDispatcher::shared(),
163        }
164    }
165}