vortex_layout/segments/
test.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use parking_lot::Mutex;
9use vortex_buffer::BufferHandle;
10use vortex_buffer::ByteBuffer;
11use vortex_buffer::ByteBufferMut;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_err;
15
16use crate::segments::SegmentFuture;
17use crate::segments::SegmentId;
18use crate::segments::SegmentSink;
19use crate::segments::SegmentSource;
20use crate::sequence::SequenceId;
21
22#[derive(Default, Clone)]
24pub struct TestSegments {
25 segments: Arc<Mutex<Vec<ByteBuffer>>>,
26}
27
28impl SegmentSource for TestSegments {
29 fn request(&self, id: SegmentId) -> SegmentFuture {
30 let buffer = self.segments.lock().get(*id as usize).cloned();
31 async move {
32 buffer
33 .map(BufferHandle::Buffer)
34 .ok_or_else(|| vortex_err!("Segment not found"))
35 }
36 .boxed()
37 }
38}
39
40#[async_trait]
41impl SegmentSink for TestSegments {
42 async fn write(
43 &self,
44 _sequence_id: SequenceId,
45 buffers: Vec<ByteBuffer>,
46 ) -> VortexResult<SegmentId> {
47 let mut buffer = ByteBufferMut::empty();
49 for segment in buffers {
50 buffer.extend_from_slice(segment.as_ref());
51 }
52
53 let mut segments = self.segments.lock();
54 let segment_id =
55 SegmentId::from(u32::try_from(segments.len()).vortex_expect("Too many segments"));
56 segments.push(buffer.freeze());
57
58 Ok(segment_id)
59 }
60}