use std::sync::Arc;
use async_trait::async_trait;
use futures::FutureExt;
use parking_lot::Mutex;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use crate::segments::SegmentFuture;
use crate::segments::SegmentId;
use crate::segments::SegmentSink;
use crate::segments::SegmentSource;
use crate::sequence::SequenceId;
#[derive(Default, Clone)]
pub struct TestSegments {
segments: Arc<Mutex<Vec<ByteBuffer>>>,
}
impl SegmentSource for TestSegments {
fn request(&self, id: SegmentId) -> SegmentFuture {
let buffer = self.segments.lock().get(*id as usize).cloned();
async move {
buffer
.map(BufferHandle::new_host)
.ok_or_else(|| vortex_err!("Segment not found"))
}
.boxed()
}
}
#[async_trait]
impl SegmentSink for TestSegments {
async fn write(
&self,
_sequence_id: SequenceId,
buffers: Vec<ByteBuffer>,
) -> VortexResult<SegmentId> {
let mut buffer = ByteBufferMut::empty();
for segment in buffers {
buffer.extend_from_slice(segment.as_ref());
}
let mut segments = self.segments.lock();
let segment_id =
SegmentId::from(u32::try_from(segments.len()).vortex_expect("Too many segments"));
segments.push(buffer.freeze());
Ok(segment_id)
}
}