1use std::sync::{Arc, LazyLock};
5
6use futures::future::BoxFuture;
7use futures::stream::BoxStream;
8use futures::{FutureExt, StreamExt};
9use vortex_buffer::{ByteBuffer, ByteBufferMut};
10use vortex_error::{VortexExpect, VortexResult, vortex_err};
11
12use crate::file::IoRequest;
13use crate::file::read::{CoalesceWindow, IntoReadSource, ReadSource, ReadSourceRef};
14use crate::runtime::Handle;
15
16impl IntoReadSource for ByteBuffer {
17 fn into_read_source(self, _handle: Handle) -> VortexResult<ReadSourceRef> {
18 Ok(Arc::new(self))
19 }
20}
21
22impl ReadSource for ByteBuffer {
23 fn uri(&self) -> &Arc<str> {
24 static URI: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from(":in-memory:"));
25 &URI
26 }
27
28 fn coalesce_window(&self) -> Option<CoalesceWindow> {
29 None
30 }
31
32 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
33 let len = self.len() as u64;
34 async move { Ok(len) }.boxed()
35 }
36
37 fn drive_send(
38 self: Arc<Self>,
39 mut requests: BoxStream<'static, IoRequest>,
40 ) -> BoxFuture<'static, ()> {
41 let buffer = self;
42 async move {
43 while let Some(req) = requests.next().await {
44 let offset = usize::try_from(req.offset())
45 .vortex_expect("In-memory buffer offset exceeds usize");
46 let len = req.len();
47
48 let result = if offset + len > buffer.len() {
49 Err(vortex_err!("Read out of bounds"))
50 } else {
51 let mut slice = ByteBufferMut::with_capacity_aligned(len, req.alignment());
52 unsafe { slice.set_len(len) };
53 slice
54 .as_mut_slice()
55 .copy_from_slice(&buffer.as_slice()[offset..offset + len]);
56 Ok(slice.freeze())
57 };
58 req.resolve(result);
59 }
60 }
61 .boxed()
62 }
63}