use crate::errors::ParquetError;
use crate::file::reader::{ChunkReader, Length};
use bytes::Bytes;
use std::fmt::Display;
use std::ops::Range;
#[derive(Debug, Clone)]
pub(crate) struct PushBuffers {
offset: u64,
file_len: u64,
ranges: Vec<Range<u64>>,
buffers: Vec<Bytes>,
}
impl Display for PushBuffers {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(
f,
"Buffers (offset: {}, file_len: {})",
self.offset, self.file_len
)?;
writeln!(f, "Available Ranges (w/ offset):")?;
for range in &self.ranges {
writeln!(
f,
" {}..{} ({}..{}): {} bytes",
range.start,
range.end,
range.start + self.offset,
range.end + self.offset,
range.end - range.start
)?;
}
Ok(())
}
}
impl PushBuffers {
pub fn new(file_len: u64) -> Self {
Self {
offset: 0,
file_len,
ranges: Vec::new(),
buffers: Vec::new(),
}
}
pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
assert_eq!(
ranges.len(),
buffers.len(),
"Number of ranges must match number of buffers"
);
for (range, buffer) in ranges.into_iter().zip(buffers.into_iter()) {
self.push_range(range, buffer);
}
}
pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) {
assert_eq!(
(range.end - range.start) as usize,
buffer.len(),
"Range length must match buffer length"
);
self.ranges.push(range);
self.buffers.push(buffer);
}
pub fn has_range(&self, range: &Range<u64>) -> bool {
self.ranges
.iter()
.any(|r| r.start <= range.start && r.end >= range.end)
}
fn iter(&self) -> impl Iterator<Item = (&Range<u64>, &Bytes)> {
self.ranges.iter().zip(self.buffers.iter())
}
pub fn file_len(&self) -> u64 {
self.file_len
}
pub fn with_offset(mut self, offset: u64) -> Self {
self.offset = offset;
self
}
#[cfg(feature = "arrow")]
pub fn buffered_bytes(&self) -> u64 {
self.ranges.iter().map(|r| r.end - r.start).sum()
}
#[cfg(feature = "arrow")]
pub fn clear_ranges(&mut self, ranges_to_clear: &[Range<u64>]) {
let mut new_ranges = Vec::new();
let mut new_buffers = Vec::new();
for (range, buffer) in self.iter() {
if !ranges_to_clear
.iter()
.any(|r| r.start == range.start && r.end == range.end)
{
new_ranges.push(range.clone());
new_buffers.push(buffer.clone());
}
}
self.ranges = new_ranges;
self.buffers = new_buffers;
}
}
impl Length for PushBuffers {
fn len(&self) -> u64 {
self.file_len
}
}
impl std::io::Read for PushBuffers {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut found = false;
for (range, data) in self.iter() {
if range.start <= self.offset && range.end >= self.offset + buf.len() as u64 {
let start_offset = (self.offset - range.start) as usize;
let end_offset = start_offset + buf.len();
let slice = data.slice(start_offset..end_offset);
buf.copy_from_slice(slice.as_ref());
found = true;
break;
}
}
if found {
self.offset += buf.len() as u64;
Ok(buf.len())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"No data available in Buffers",
))
}
}
}
impl ChunkReader for PushBuffers {
type T = Self;
fn get_read(&self, start: u64) -> Result<Self::T, ParquetError> {
Ok(self.clone().with_offset(self.offset + start))
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes, ParquetError> {
for (range, data) in self.iter() {
if range.start <= start && range.end >= start + length as u64 {
let start_offset = (start - range.start) as usize;
return Ok(data.slice(start_offset..start_offset + length));
}
}
let requested_end = start + length as u64;
Err(ParquetError::NeedMoreDataRange(start..requested_end))
}
}