vortex_file/
memory.rs

1use std::sync::Arc;
2
3use futures::FutureExt;
4use futures::future::BoxFuture;
5use vortex_buffer::ByteBuffer;
6use vortex_error::{VortexExpect, VortexResult, vortex_err};
7use vortex_layout::segments::{SegmentId, SegmentSource};
8use vortex_metrics::VortexMetrics;
9
10use crate::{FileType, Footer, SegmentSourceFactory, SegmentSpec, VortexFile, VortexOpenOptions};
11
12/// A Vortex file that is backed by an in-memory buffer.
13///
14/// This type of file reader performs no coalescing or other clever orchestration, simply
15/// zero-copy slicing the segments from the buffer.
16pub struct InMemoryFileType;
17
18impl FileType for InMemoryFileType {
19    type Options = ();
20}
21
22impl VortexOpenOptions<InMemoryFileType> {
23    /// Create open options for an in-memory Vortex file.
24    pub fn in_memory() -> Self {
25        Self::new(())
26    }
27
28    /// Open an in-memory file contained in the provided buffer.
29    pub async fn open<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
30        let buffer = buffer.into();
31
32        let postscript = self.parse_postscript(&buffer)?;
33
34        // If we haven't been provided a DType, we must read one from the file.
35        let dtype = self.dtype
36            .clone()
37            .map(Ok)
38            .unwrap_or_else(|| {
39                let dtype_segment = postscript
40                    .dtype
41                    .ok_or_else(|| vortex_err!("Vortex file doesn't embed a DType and one has not been provided to VortexOpenOptions"))?;
42                self.parse_dtype(0, &buffer, &dtype_segment)
43            })?;
44
45        let file_stats = postscript
46            .statistics
47            .map(|segment| self.parse_file_statistics(0, &buffer, &segment))
48            .transpose()?;
49
50        let footer = self.parse_footer(
51            0,
52            &buffer,
53            &postscript.footer,
54            &postscript.layout,
55            dtype,
56            file_stats,
57        )?;
58
59        let segment_source_factory = Arc::new(InMemorySegmentReader {
60            buffer,
61            footer: footer.clone(),
62        });
63
64        Ok(VortexFile {
65            footer,
66            segment_source_factory,
67            metrics: self.metrics,
68        })
69    }
70}
71
72#[derive(Clone)]
73struct InMemorySegmentReader {
74    buffer: ByteBuffer,
75    footer: Footer,
76}
77
78impl SegmentSourceFactory for InMemorySegmentReader {
79    fn segment_source(&self, _metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
80        Arc::new(self.clone())
81    }
82}
83
84impl SegmentSource for InMemorySegmentReader {
85    fn request(
86        &self,
87        id: SegmentId,
88        _for_whom: &Arc<str>,
89    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
90        let segment_map = self.footer.segment_map().clone();
91        let buffer = self.buffer.clone();
92
93        async move {
94            let segment: &SegmentSpec = segment_map
95                .get(*id as usize)
96                .ok_or_else(|| vortex_err!("segment not found"))?;
97
98            let start =
99                usize::try_from(segment.offset).vortex_expect("segment offset larger than usize");
100            let end = start + segment.length as usize;
101
102            Ok(buffer.slice(start..end))
103        }
104        .boxed()
105    }
106}