1use std::sync::Arc;
2
3use futures::FutureExt;
4use futures::future::BoxFuture;
5use vortex_buffer::ByteBuffer;
6use vortex_error::{VortexExpect, VortexResult, vortex_err};
7use vortex_layout::segments::{SegmentId, SegmentSource};
8use vortex_metrics::VortexMetrics;
9
10use crate::{FileType, Footer, SegmentSourceFactory, SegmentSpec, VortexFile, VortexOpenOptions};
11
12pub struct InMemoryFileType;
17
18impl FileType for InMemoryFileType {
19 type Options = ();
20}
21
22impl VortexOpenOptions<InMemoryFileType> {
23 pub fn in_memory() -> Self {
25 Self::new(())
26 }
27
28 pub async fn open<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
30 let buffer = buffer.into();
31
32 let postscript = self.parse_postscript(&buffer)?;
33
34 let dtype = self.dtype
36 .clone()
37 .map(Ok)
38 .unwrap_or_else(|| {
39 let dtype_segment = postscript
40 .dtype
41 .ok_or_else(|| vortex_err!("Vortex file doesn't embed a DType and one has not been provided to VortexOpenOptions"))?;
42 self.parse_dtype(0, &buffer, &dtype_segment)
43 })?;
44
45 let file_stats = postscript
46 .statistics
47 .map(|segment| self.parse_file_statistics(0, &buffer, &segment))
48 .transpose()?;
49
50 let footer = self.parse_footer(
51 0,
52 &buffer,
53 &postscript.footer,
54 &postscript.layout,
55 dtype,
56 file_stats,
57 )?;
58
59 let segment_source_factory = Arc::new(InMemorySegmentReader {
60 buffer,
61 footer: footer.clone(),
62 });
63
64 Ok(VortexFile {
65 footer,
66 segment_source_factory,
67 metrics: self.metrics,
68 })
69 }
70}
71
72#[derive(Clone)]
73struct InMemorySegmentReader {
74 buffer: ByteBuffer,
75 footer: Footer,
76}
77
78impl SegmentSourceFactory for InMemorySegmentReader {
79 fn segment_source(&self, _metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
80 Arc::new(self.clone())
81 }
82}
83
84impl SegmentSource for InMemorySegmentReader {
85 fn request(
86 &self,
87 id: SegmentId,
88 _for_whom: &Arc<str>,
89 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
90 let segment_map = self.footer.segment_map().clone();
91 let buffer = self.buffer.clone();
92
93 async move {
94 let segment: &SegmentSpec = segment_map
95 .get(*id as usize)
96 .ok_or_else(|| vortex_err!("segment not found"))?;
97
98 let start =
99 usize::try_from(segment.offset).vortex_expect("segment offset larger than usize");
100 let end = start + segment.length as usize;
101
102 Ok(buffer.slice(start..end))
103 }
104 .boxed()
105 }
106}