use std::io::{self, Read};
use std::sync::mpsc::{SendError, Sender, SyncSender};
use std::sync::{Arc, Mutex};
use stellar_xdr::next::{LedgerCloseMeta, Limits, Type, TypeVariant};
pub const DEFAULT_XDR_RW_DEPTH_LIMIT: u32 = 500;
const META_PIPE_BUFFER_SIZE: usize = 10 * 1024 * 1024;
const LEDGER_READ_AHEAD_BUFFER_SIZE: usize = 20;
#[derive(thiserror::Error, Debug, Clone)]
pub enum BufReaderError {
#[error("Unknown type {0}, choose one of {1:?}")]
UnknownType(String, &'static [&'static str]),
#[error("Error decoding XDR")]
ReadXdrNext,
#[error("Wants to run single-threaded mode but specified transmitter")]
UnusedTransmitter,
#[error("Wants to run multi-threaded mode but no transmitter specified")]
MissingTransmitter,
#[error("Wants to use single-threaded mode features but is multi-thread mode")]
WrongModeMultiThread,
#[error("Wants to use multi-threaded mode features but is single-thread mode")]
WrongModeSingleThread,
#[error("Cloned BufReaders must only be used for their thread mode")]
UsedClonedBufreader,
#[error("Error while sending meta to receiver {0}")]
SendError(#[from] SendError<Box<MetaResult>>),
#[error("Failed to aquire lock")]
LockError,
}
#[derive(Clone, Debug)]
pub struct LedgerCloseMetaWrapper {
pub ledger_close_meta: LedgerCloseMeta,
}
impl LedgerCloseMetaWrapper {
fn new(inner: LedgerCloseMeta) -> Self {
Self {
ledger_close_meta: inner,
}
}
}
impl From<Type> for LedgerCloseMetaWrapper {
fn from(value: Type) -> Self {
match value {
Type::LedgerCloseMeta(boxed_ledger_close_meta) => Self::new(*boxed_ledger_close_meta),
_ => unreachable!(),
}
}
}
#[derive(Clone, Debug)]
pub struct MetaResult {
pub ledger_close_meta: Option<LedgerCloseMetaWrapper>,
pub err: Option<BufReaderError>,
}
#[derive(PartialEq, Eq, Clone)]
pub enum BufferedLedgerMetaReaderMode {
SingleThread,
MultiThread,
}
pub struct BufferedLedgerMetaReader {
mode: BufferedLedgerMetaReaderMode,
reader: Option<io::BufReader<Box<dyn Read + Send>>>,
cached: Option<Arc<Mutex<Vec<MetaResult>>>>,
transmitter: Option<Sender<Box<MetaResult>>>,
sync_transmitter: Option<SyncSender<Box<MetaResult>>>,
async_transmitter: Option<tokio::sync::mpsc::UnboundedSender<Box<MetaResult>>>,
async_transmitter_bounded: Option<tokio::sync::mpsc::Sender<Box<MetaResult>>>,
cloned: bool,
}
impl Clone for BufferedLedgerMetaReader {
fn clone(&self) -> Self {
Self {
mode: self.mode.clone(),
reader: None,
cached: None,
transmitter: None,
sync_transmitter: None,
async_transmitter: None,
async_transmitter_bounded: None,
cloned: true,
}
}
}
impl BufferedLedgerMetaReader {
pub fn new(
mode: BufferedLedgerMetaReaderMode,
reader: Box<dyn Read + Send>,
transmitter: Option<std::sync::mpsc::Sender<Box<MetaResult>>>,
sync_transmitter: Option<SyncSender<Box<MetaResult>>>,
async_transmitter: Option<tokio::sync::mpsc::UnboundedSender<Box<MetaResult>>>,
async_transmitter_bounded: Option<tokio::sync::mpsc::Sender<Box<MetaResult>>>,
) -> Result<Self, BufReaderError> {
let reader = io::BufReader::with_capacity(META_PIPE_BUFFER_SIZE, reader);
let cached = {
let tx_is = transmitter.is_some();
let sync_tx_is = sync_transmitter.is_some();
if tx_is && sync_tx_is {
return Err(BufReaderError::MissingTransmitter);
}
match mode {
BufferedLedgerMetaReaderMode::SingleThread => {
if tx_is || sync_tx_is {
return Err(BufReaderError::UnusedTransmitter);
}
Some(Arc::new(Mutex::new(Vec::with_capacity(
LEDGER_READ_AHEAD_BUFFER_SIZE,
))))
}
BufferedLedgerMetaReaderMode::MultiThread => {
if !tx_is && !sync_tx_is && !async_transmitter.is_some() {
return Err(BufReaderError::MissingTransmitter);
}
None
}
}
};
Ok(Self {
mode,
reader: Some(reader),
cached,
transmitter,
sync_transmitter,
async_transmitter,
async_transmitter_bounded,
cloned: false,
})
}
pub fn thread_mode(&self) -> &BufferedLedgerMetaReaderMode {
&self.mode
}
}
pub trait SingleThreadBufferedLedgerMetaReader {
fn single_thread_read_ledger_meta_from_pipe(&mut self) -> Result<(), BufReaderError>;
fn read_meta(&self) -> Result<Vec<MetaResult>, BufReaderError>;
fn clear_buffered(&mut self) -> Result<(), BufReaderError>;
}
pub trait MultiThreadBufferedLedgerMetaReader {
fn multi_thread_read_ledger_meta_from_pipe(&mut self) -> Result<(), BufReaderError>;
}
impl SingleThreadBufferedLedgerMetaReader for BufferedLedgerMetaReader {
fn single_thread_read_ledger_meta_from_pipe(&mut self) -> Result<(), BufReaderError> {
if self.mode != BufferedLedgerMetaReaderMode::SingleThread {
return Err(BufReaderError::WrongModeMultiThread);
}
if self.cloned {
return Err(BufReaderError::UsedClonedBufreader);
}
let mut reader = self.reader.as_mut().unwrap();
let mut xdr_reader =
stellar_xdr::next::Limited::new(&mut reader, Limits::depth(DEFAULT_XDR_RW_DEPTH_LIMIT));
for t in stellar_xdr::next::Type::read_xdr_framed_iter(
TypeVariant::LedgerCloseMeta,
&mut xdr_reader,
) {
let meta_obj = match t {
Ok(ledger_close_meta) => MetaResult {
ledger_close_meta: Some(ledger_close_meta.into()),
err: None,
},
Err(_) => MetaResult {
ledger_close_meta: None,
err: Some(BufReaderError::ReadXdrNext),
},
};
self.cached
.as_ref()
.unwrap()
.lock()
.map_err(|_| BufReaderError::LockError)?
.push(meta_obj);
}
Ok(())
}
fn read_meta(&self) -> Result<Vec<MetaResult>, BufReaderError> {
if self.mode != BufferedLedgerMetaReaderMode::SingleThread {
return Err(BufReaderError::WrongModeMultiThread);
}
if self.cloned {
return Err(BufReaderError::UsedClonedBufreader);
}
let locked = self
.cached
.as_ref()
.unwrap()
.lock()
.map_err(|_| BufReaderError::LockError)?;
Ok((*locked).clone())
}
fn clear_buffered(&mut self) -> Result<(), BufReaderError> {
if self.mode != BufferedLedgerMetaReaderMode::SingleThread {
return Err(BufReaderError::WrongModeMultiThread);
}
if self.cloned {
return Err(BufReaderError::UsedClonedBufreader);
}
self.cached = Some(Arc::new(Mutex::new(Vec::with_capacity(
LEDGER_READ_AHEAD_BUFFER_SIZE,
))));
Ok(())
}
}
impl MultiThreadBufferedLedgerMetaReader for BufferedLedgerMetaReader {
fn multi_thread_read_ledger_meta_from_pipe(&mut self) -> Result<(), BufReaderError> {
if self.mode != BufferedLedgerMetaReaderMode::MultiThread {
return Err(BufReaderError::WrongModeSingleThread);
}
if self.cloned {
return Err(BufReaderError::UsedClonedBufreader);
}
let mut reader = self.reader.as_mut().unwrap();
let mut xdr_reader =
stellar_xdr::next::Limited::new(&mut reader, Limits::depth(DEFAULT_XDR_RW_DEPTH_LIMIT));
for t in stellar_xdr::next::Type::read_xdr_framed_iter(
TypeVariant::LedgerCloseMeta,
&mut xdr_reader,
) {
let meta_obj = match t {
Ok(ledger_close_meta) => MetaResult {
ledger_close_meta: Some(ledger_close_meta.into()),
err: None,
},
Err(_) => MetaResult {
ledger_close_meta: None,
err: Some(BufReaderError::ReadXdrNext),
},
};
if let Some(tx) = self.sync_transmitter.as_ref() {
tx.send(Box::new(meta_obj))?
} else {
self.transmitter
.as_ref()
.unwrap()
.send(Box::new(meta_obj))?
}
}
Ok(())
}
}
impl BufferedLedgerMetaReader {
pub async fn async_multi_thread_read_ledger_meta_from_pipe(
&mut self,
) -> Result<(), BufReaderError> {
if self.mode != BufferedLedgerMetaReaderMode::MultiThread {
return Err(BufReaderError::WrongModeSingleThread);
}
if self.cloned {
return Err(BufReaderError::UsedClonedBufreader);
}
let mut reader = self.reader.as_mut().unwrap();
let mut xdr_reader =
stellar_xdr::next::Limited::new(&mut reader, Limits::depth(DEFAULT_XDR_RW_DEPTH_LIMIT));
for t in stellar_xdr::next::Type::read_xdr_framed_iter(
TypeVariant::LedgerCloseMeta,
&mut xdr_reader,
) {
println!("\n\n Inner got result");
let meta_obj = match t {
Ok(ledger_close_meta) => MetaResult {
ledger_close_meta: Some(ledger_close_meta.into()),
err: None,
},
Err(_) => MetaResult {
ledger_close_meta: None,
err: Some(BufReaderError::ReadXdrNext),
},
};
if let Some(tx) = self.async_transmitter.as_ref() {
let transmit = tx.send(Box::new(meta_obj));
if transmit.is_err() {
log::error!(
"Failed to transmit ledger close: {:?}",
transmit.err().unwrap()
)
}
}
}
Ok(())
}
}