1use std::sync::Arc;
2
3use futures::FutureExt;
4use futures::future::BoxFuture;
5use vortex_buffer::ByteBuffer;
6use vortex_error::{VortexExpect, VortexResult, vortex_err};
7use vortex_layout::segments::{SegmentId, SegmentSource};
8use vortex_metrics::VortexMetrics;
9
10use crate::{FileType, Footer, SegmentSourceFactory, SegmentSpec, VortexFile, VortexOpenOptions};
11
12pub struct InMemoryVortexFile;
17
18impl FileType for InMemoryVortexFile {
19 type Options = ();
20}
21
22impl VortexOpenOptions<InMemoryVortexFile> {
23 pub fn in_memory() -> Self {
25 Self::new(())
26 }
27
28 pub async fn open<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
30 let buffer = buffer.into();
31 let footer = self.read_footer(&buffer).await?;
32 let segment_source_factory = Arc::new(InMemorySegmentReader {
33 buffer,
34 footer: footer.clone(),
35 });
36
37 Ok(VortexFile {
38 footer,
39 segment_source_factory,
40 metrics: self.metrics,
41 })
42 }
43}
44
45#[derive(Clone)]
46struct InMemorySegmentReader {
47 buffer: ByteBuffer,
48 footer: Footer,
49}
50
51impl SegmentSourceFactory for InMemorySegmentReader {
52 fn segment_source(&self, _metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
53 Arc::new(self.clone())
54 }
55}
56
57impl SegmentSource for InMemorySegmentReader {
58 fn request(
59 &self,
60 id: SegmentId,
61 _for_whom: &Arc<str>,
62 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
63 let segment_map = self.footer.segment_map().clone();
64 let buffer = self.buffer.clone();
65
66 async move {
67 let segment: &SegmentSpec = segment_map
68 .get(*id as usize)
69 .ok_or_else(|| vortex_err!("segment not found"))?;
70
71 let start =
72 usize::try_from(segment.offset).vortex_expect("segment offset larger than usize");
73 let end = start + segment.length as usize;
74
75 Ok(buffer.slice(start..end))
76 }
77 .boxed()
78 }
79}