1use std::sync::Arc;
5use std::sync::LazyLock;
6
7use futures::FutureExt;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use futures::stream::BoxStream;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_err;
16
17use crate::file::IoRequest;
18use crate::file::read::CoalesceWindow;
19use crate::file::read::IntoReadSource;
20use crate::file::read::ReadSource;
21use crate::file::read::ReadSourceRef;
22use crate::runtime::Handle;
23
24impl IntoReadSource for ByteBuffer {
25 fn into_read_source(self, _handle: Handle) -> VortexResult<ReadSourceRef> {
26 Ok(Arc::new(self))
27 }
28}
29
30impl ReadSource for ByteBuffer {
31 fn uri(&self) -> &Arc<str> {
32 static URI: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from(":in-memory:"));
33 &URI
34 }
35
36 fn coalesce_window(&self) -> Option<CoalesceWindow> {
37 None
38 }
39
40 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
41 let len = self.len() as u64;
42 async move { Ok(len) }.boxed()
43 }
44
45 fn drive_send(
46 self: Arc<Self>,
47 mut requests: BoxStream<'static, IoRequest>,
48 ) -> BoxFuture<'static, ()> {
49 let buffer = self;
50 async move {
51 while let Some(req) = requests.next().await {
52 let offset = usize::try_from(req.offset())
53 .vortex_expect("In-memory buffer offset exceeds usize");
54 let len = req.len();
55
56 let result = if offset + len > buffer.len() {
57 Err(vortex_err!("Read out of bounds"))
58 } else {
59 let mut slice = ByteBufferMut::with_capacity_aligned(len, req.alignment());
60 unsafe { slice.set_len(len) };
61 slice
62 .as_mut_slice()
63 .copy_from_slice(&buffer.as_slice()[offset..offset + len]);
64 Ok(slice.freeze())
65 };
66 req.resolve(result);
67 }
68 }
69 .boxed()
70 }
71}