use std::io::Read;
use protobuf::Message;
use crate::error::InternalError;
use crate::protocol::transaction::Transaction;
use crate::protos::FromProto;
use super::TransactionReader;
pub struct ProtobufTransactionReader<'a> {
source: protobuf::CodedInputStream<'a>,
}
impl<'a> ProtobufTransactionReader<'a> {
pub fn new(source: &'a mut dyn Read) -> Self {
let source = protobuf::CodedInputStream::new(source);
ProtobufTransactionReader { source }
}
}
impl TransactionReader for ProtobufTransactionReader<'_> {
fn next(&mut self, max_txns: usize) -> Result<Vec<Transaction>, InternalError> {
let mut results = Vec::with_capacity(max_txns);
for _ in 0..max_txns {
let eof = self
.source
.eof()
.map_err(|err| InternalError::from_source(Box::new(err)))?;
if eof {
break;
}
let next_len = self
.source
.read_raw_varint32()
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let buf = self
.source
.read_raw_bytes(next_len)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let msg = Message::parse_from_bytes(&buf)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let txn = Transaction::from_proto(msg)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
results.push(txn);
}
Ok(results)
}
}