vortex_layout/segments/
test.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use parking_lot::Mutex;
9use vortex_buffer::BufferHandle;
10use vortex_buffer::ByteBuffer;
11use vortex_buffer::ByteBufferMut;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_err;
15
16use crate::segments::SegmentFuture;
17use crate::segments::SegmentId;
18use crate::segments::SegmentSink;
19use crate::segments::SegmentSource;
20use crate::sequence::SequenceId;
21
22/// A dummy in-memory implementation of a segment reader and writer.
23#[derive(Default, Clone)]
24pub struct TestSegments {
25    segments: Arc<Mutex<Vec<ByteBuffer>>>,
26}
27
28impl SegmentSource for TestSegments {
29    fn request(&self, id: SegmentId) -> SegmentFuture {
30        let buffer = self.segments.lock().get(*id as usize).cloned();
31        async move {
32            buffer
33                .map(BufferHandle::Buffer)
34                .ok_or_else(|| vortex_err!("Segment not found"))
35        }
36        .boxed()
37    }
38}
39
40#[async_trait]
41impl SegmentSink for TestSegments {
42    async fn write(
43        &self,
44        _sequence_id: SequenceId,
45        buffers: Vec<ByteBuffer>,
46    ) -> VortexResult<SegmentId> {
47        // Combine all the buffers since we're only a test implementation
48        let mut buffer = ByteBufferMut::empty();
49        for segment in buffers {
50            buffer.extend_from_slice(segment.as_ref());
51        }
52
53        let mut segments = self.segments.lock();
54        let segment_id =
55            SegmentId::from(u32::try_from(segments.len()).vortex_expect("Too many segments"));
56        segments.push(buffer.freeze());
57
58        Ok(segment_id)
59    }
60}