Skip to main content

vortex_datafusion/persistent/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Factory for creating [`VortexReadAt`] instances from [`PartitionedFile`]s.
5
6use std::fmt::Debug;
7use std::sync::Arc;
8
9use datafusion_common::Result as DFResult;
10use datafusion_datasource::PartitionedFile;
11use object_store::ObjectStore;
12use vortex::array::memory::MemorySessionExt;
13use vortex::io::VortexReadAt;
14use vortex::io::object_store::ObjectStoreReadAt;
15use vortex::io::session::RuntimeSessionExt;
16use vortex::session::VortexSession;
17
18/// Factory to create [`VortexReadAt`] instances for a `PartitionedFile`.
19///
20/// Plug a custom implementation into [`VortexSource::with_vortex_reader_factory`]
21/// when the default object-store reader is not sufficient, for example to:
22///
23/// - reuse an application-level metadata cache,
24/// - wrap reads with custom authentication or routing,
25/// - coalesce I/O in a remote storage layer.
26///
27/// [`VortexSource::with_vortex_reader_factory`]: crate::VortexSource::with_vortex_reader_factory
28pub trait VortexReaderFactory: Debug + Send + Sync + 'static {
29    /// Create a reader for a target object.
30    fn create_reader(
31        &self,
32        file: &PartitionedFile,
33        session: &VortexSession,
34    ) -> DFResult<Arc<dyn VortexReadAt>>;
35}
36
37/// Default [`VortexReaderFactory`] backed by DataFusion's [`ObjectStore`].
38///
39/// This is the reader used by [`crate::VortexSource`] and
40/// [`crate::VortexFormat`] unless a
41/// custom factory is supplied. It works with any object store that DataFusion
42/// has registered for the scan.
43#[derive(Debug)]
44pub struct DefaultVortexReaderFactory {
45    object_store: Arc<dyn ObjectStore>,
46}
47
48impl DefaultVortexReaderFactory {
49    /// Creates a new factory from an [`ObjectStore`].
50    ///
51    /// # Example
52    ///
53    /// ```rust
54    /// # use std::sync::Arc;
55    /// # use object_store::memory::InMemory;
56    /// use vortex_datafusion::reader::DefaultVortexReaderFactory;
57    ///
58    /// let factory = DefaultVortexReaderFactory::new(Arc::new(InMemory::new()));
59    /// # let _ = factory;
60    /// ```
61    pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
62        Self { object_store }
63    }
64}
65
66impl VortexReaderFactory for DefaultVortexReaderFactory {
67    fn create_reader(
68        &self,
69        file: &PartitionedFile,
70        session: &VortexSession,
71    ) -> DFResult<Arc<dyn VortexReadAt>> {
72        Ok(Arc::new(ObjectStoreReadAt::new_with_allocator(
73            Arc::clone(&self.object_store),
74            file.path().clone(),
75            session.handle(),
76            session.allocator(),
77        )) as _)
78    }
79}