vortex_io/file/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::{Arc, LazyLock};
5
6use futures::future::BoxFuture;
7use futures::stream::BoxStream;
8use futures::{FutureExt, StreamExt};
9use vortex_buffer::{ByteBuffer, ByteBufferMut};
10use vortex_error::{VortexExpect, VortexResult, vortex_err};
11
12use crate::file::IoRequest;
13use crate::file::read::{CoalesceWindow, IntoReadSource, ReadSource, ReadSourceRef};
14use crate::runtime::Handle;
15
16impl IntoReadSource for ByteBuffer {
17    fn into_read_source(self, _handle: Handle) -> VortexResult<ReadSourceRef> {
18        Ok(Arc::new(self))
19    }
20}
21
22impl ReadSource for ByteBuffer {
23    fn uri(&self) -> &Arc<str> {
24        static URI: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from(":in-memory:"));
25        &URI
26    }
27
28    fn coalesce_window(&self) -> Option<CoalesceWindow> {
29        None
30    }
31
32    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
33        let len = self.len() as u64;
34        async move { Ok(len) }.boxed()
35    }
36
37    fn drive_send(
38        self: Arc<Self>,
39        mut requests: BoxStream<'static, IoRequest>,
40    ) -> BoxFuture<'static, ()> {
41        let buffer = self;
42        async move {
43            while let Some(req) = requests.next().await {
44                let offset = usize::try_from(req.offset())
45                    .vortex_expect("In-memory buffer offset exceeds usize");
46                let len = req.len();
47
48                let result = if offset + len > buffer.len() {
49                    Err(vortex_err!("Read out of bounds"))
50                } else {
51                    let mut slice = ByteBufferMut::with_capacity_aligned(len, req.alignment());
52                    unsafe { slice.set_len(len) };
53                    slice
54                        .as_mut_slice()
55                        .copy_from_slice(&buffer.as_slice()[offset..offset + len]);
56                    Ok(slice.freeze())
57                };
58                req.resolve(result);
59            }
60        }
61        .boxed()
62    }
63}