vortex_io/file/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5use std::sync::LazyLock;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use futures::stream::BoxStream;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_err;
16
17use crate::file::IoRequest;
18use crate::file::read::CoalesceWindow;
19use crate::file::read::IntoReadSource;
20use crate::file::read::ReadSource;
21use crate::file::read::ReadSourceRef;
22use crate::runtime::Handle;
23
24impl IntoReadSource for ByteBuffer {
25    fn into_read_source(self, _handle: Handle) -> VortexResult<ReadSourceRef> {
26        Ok(Arc::new(self))
27    }
28}
29
30impl ReadSource for ByteBuffer {
31    fn uri(&self) -> &Arc<str> {
32        static URI: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from(":in-memory:"));
33        &URI
34    }
35
36    fn coalesce_window(&self) -> Option<CoalesceWindow> {
37        None
38    }
39
40    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
41        let len = self.len() as u64;
42        async move { Ok(len) }.boxed()
43    }
44
45    fn drive_send(
46        self: Arc<Self>,
47        mut requests: BoxStream<'static, IoRequest>,
48    ) -> BoxFuture<'static, ()> {
49        let buffer = self;
50        async move {
51            while let Some(req) = requests.next().await {
52                let offset = usize::try_from(req.offset())
53                    .vortex_expect("In-memory buffer offset exceeds usize");
54                let len = req.len();
55
56                let result = if offset + len > buffer.len() {
57                    Err(vortex_err!("Read out of bounds"))
58                } else {
59                    let mut slice = ByteBufferMut::with_capacity_aligned(len, req.alignment());
60                    unsafe { slice.set_len(len) };
61                    slice
62                        .as_mut_slice()
63                        .copy_from_slice(&buffer.as_slice()[offset..offset + len]);
64                    Ok(slice.freeze())
65                };
66                req.resolve(result);
67            }
68        }
69        .boxed()
70    }
71}