Skip to main content

vortex_datafusion/persistent/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Factory for creating [`VortexReadAt`][vortex::io::VortexReadAt] instances
5//! from [`PartitionedFile`]s.
6
7use std::fmt::Debug;
8use std::sync::Arc;
9
10use datafusion_common::Result as DFResult;
11use datafusion_datasource::PartitionedFile;
12use object_store::ObjectStore;
13use vortex::array::memory::MemorySessionExt;
14use vortex::io::VortexReadAt;
15use vortex::io::object_store::ObjectStoreReadAt;
16use vortex::io::session::RuntimeSessionExt;
17use vortex::session::VortexSession;
18
19/// Factory to create [`VortexReadAt`] instances for a `PartitionedFile`.
20///
21/// Plug a custom implementation into [`VortexSource::with_vortex_reader_factory`]
22/// when the default object-store reader is not sufficient, for example to:
23///
24/// - reuse an application-level metadata cache,
25/// - wrap reads with custom authentication or routing,
26/// - coalesce I/O in a remote storage layer.
27///
28/// [`VortexSource::with_vortex_reader_factory`]: crate::VortexSource::with_vortex_reader_factory
29pub trait VortexReaderFactory: Debug + Send + Sync + 'static {
30    /// Create a reader for a target object.
31    fn create_reader(
32        &self,
33        file: &PartitionedFile,
34        session: &VortexSession,
35    ) -> DFResult<Arc<dyn VortexReadAt>>;
36}
37
38/// Default [`VortexReaderFactory`] backed by DataFusion's [`ObjectStore`].
39///
40/// This is the reader used by [`crate::VortexSource`] and
41/// [`crate::VortexFormat`] unless a
42/// custom factory is supplied. It works with any object store that DataFusion
43/// has registered for the scan.
44#[derive(Debug)]
45pub struct DefaultVortexReaderFactory {
46    object_store: Arc<dyn ObjectStore>,
47}
48
49impl DefaultVortexReaderFactory {
50    /// Creates a new factory from an [`ObjectStore`].
51    ///
52    /// # Example
53    ///
54    /// ```rust
55    /// # use std::sync::Arc;
56    /// # use object_store::memory::InMemory;
57    /// use vortex_datafusion::reader::DefaultVortexReaderFactory;
58    ///
59    /// let factory = DefaultVortexReaderFactory::new(Arc::new(InMemory::new()));
60    /// # let _ = factory;
61    /// ```
62    pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
63        Self { object_store }
64    }
65}
66
67impl VortexReaderFactory for DefaultVortexReaderFactory {
68    fn create_reader(
69        &self,
70        file: &PartitionedFile,
71        session: &VortexSession,
72    ) -> DFResult<Arc<dyn VortexReadAt>> {
73        Ok(Arc::new(ObjectStoreReadAt::new_with_allocator(
74            Arc::clone(&self.object_store),
75            file.path().as_ref().into(),
76            session.handle(),
77            session.allocator(),
78        )) as _)
79    }
80}