vortex_file/
memory.rs

1use std::sync::Arc;
2
3use futures::FutureExt;
4use futures::future::BoxFuture;
5use vortex_buffer::ByteBuffer;
6use vortex_error::{VortexExpect, VortexResult, vortex_err};
7use vortex_layout::segments::{SegmentId, SegmentSource};
8use vortex_metrics::VortexMetrics;
9
10use crate::{FileType, Footer, SegmentSourceFactory, SegmentSpec, VortexFile, VortexOpenOptions};
11
12/// A Vortex file that is backed by an in-memory buffer.
13///
14/// This type of file reader performs no coalescing or other clever orchestration, simply
15/// zero-copy slicing the segments from the buffer.
16pub struct InMemoryVortexFile;
17
18impl FileType for InMemoryVortexFile {
19    type Options = ();
20}
21
22impl VortexOpenOptions<InMemoryVortexFile> {
23    /// Create open options for an in-memory Vortex file.
24    pub fn in_memory() -> Self {
25        Self::new(())
26    }
27
28    /// Open an in-memory file contained in the provided buffer.
29    pub async fn open<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
30        let buffer = buffer.into();
31        let footer = self.read_footer(&buffer).await?;
32        let segment_source_factory = Arc::new(InMemorySegmentReader {
33            buffer,
34            footer: footer.clone(),
35        });
36
37        Ok(VortexFile {
38            footer,
39            segment_source_factory,
40            metrics: self.metrics,
41        })
42    }
43}
44
45#[derive(Clone)]
46struct InMemorySegmentReader {
47    buffer: ByteBuffer,
48    footer: Footer,
49}
50
51impl SegmentSourceFactory for InMemorySegmentReader {
52    fn segment_source(&self, _metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
53        Arc::new(self.clone())
54    }
55}
56
57impl SegmentSource for InMemorySegmentReader {
58    fn request(
59        &self,
60        id: SegmentId,
61        _for_whom: &Arc<str>,
62    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
63        let segment_map = self.footer.segment_map().clone();
64        let buffer = self.buffer.clone();
65
66        async move {
67            let segment: &SegmentSpec = segment_map
68                .get(*id as usize)
69                .ok_or_else(|| vortex_err!("segment not found"))?;
70
71            let start =
72                usize::try_from(segment.offset).vortex_expect("segment offset larger than usize");
73            let end = start + segment.length as usize;
74
75            Ok(buffer.slice(start..end))
76        }
77        .boxed()
78    }
79}