use std::io::Read;
use protobuf::Message;
use crate::error::InternalError;
use crate::protocol::batch::BatchPair;
use crate::protos::FromProto;
use super::BatchReader;
pub struct ProtobufBatchReader<'a> {
source: protobuf::CodedInputStream<'a>,
}
impl<'a> ProtobufBatchReader<'a> {
pub fn new(source: &'a mut dyn Read) -> Self {
let source = protobuf::CodedInputStream::new(source);
ProtobufBatchReader { source }
}
}
impl BatchReader for ProtobufBatchReader<'_> {
fn next(&mut self, max_batches: usize) -> Result<Vec<BatchPair>, InternalError> {
let mut results = Vec::with_capacity(max_batches);
for _ in 0..max_batches {
let eof = self
.source
.eof()
.map_err(|err| InternalError::from_source(Box::new(err)))?;
if eof {
break;
}
let next_len = self
.source
.read_raw_varint32()
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let buf = self
.source
.read_raw_bytes(next_len)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let msg = Message::parse_from_bytes(&buf)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let batch = BatchPair::from_proto(msg)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
results.push(batch);
}
Ok(results)
}
}