vortex_file/segments/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::TryFutureExt;
8use vortex_buffer::BufferHandle;
9use vortex_error::VortexError;
10use vortex_error::vortex_err;
11use vortex_io::VortexReadAt;
12use vortex_layout::segments::SegmentFuture;
13use vortex_layout::segments::SegmentId;
14use vortex_layout::segments::SegmentSource;
15
16use crate::SegmentSpec;
17
18pub struct FileSegmentSource {
19    segments: Arc<[SegmentSpec]>,
20    read: Arc<dyn VortexReadAt>,
21}
22
23impl FileSegmentSource {
24    pub fn new(segments: Arc<[SegmentSpec]>, read: Arc<dyn VortexReadAt>) -> Self {
25        Self { segments, read }
26    }
27}
28
29impl SegmentSource for FileSegmentSource {
30    fn request(&self, id: SegmentId) -> SegmentFuture {
31        // We eagerly create the read future here assuming the behaviour of [`FileRead`], where
32        // coalescing becomes effective prior to the future being polled.
33        let maybe_fut = self.segments.get(*id as usize).cloned().map(|spec| {
34            self.read
35                .clone()
36                .read_at(spec.offset, spec.length as usize, spec.alignment)
37                .map_err(VortexError::from)
38        });
39
40        async move {
41            maybe_fut
42                .ok_or_else(|| vortex_err!("Missing segment: {}", id))?
43                .await
44                .map(BufferHandle::Buffer)
45        }
46        .boxed()
47    }
48}