use crate::{
toml::generate_predefined_cfg, BufferedLedgerMetaReaderMode, IngestionConfig, MetaResult,
RunnerError, StellarCoreRunner, StellarCoreRunnerPublic,
};
use std::sync::mpsc::Receiver;
use stellar_xdr::next::LedgerCloseMeta;
#[derive(Clone, Copy)]
pub struct BoundedRange(pub u32, pub u32);
pub enum Range {
Bounded(BoundedRange),
}
impl From<Range> for std::ops::Range<u32> {
fn from(range: Range) -> Self {
match range {
Range::Bounded(bounded_range) => bounded_range.0..bounded_range.1,
}
}
}
impl Range {
pub fn bounded(&self) -> (u32, u32) {
match self {
Range::Bounded(bounded_range) => (bounded_range.0, bounded_range.1),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Error while running core: {0}")]
Core(#[from] RunnerError),
#[error("Requested ledger was not found in prepared ledgers")]
LedgerNotFound,
#[error("Called closing mechanism, but core is running in single-thread mode")]
CloseOnSingleThread,
}
pub struct CaptiveCore {
pub stellar_core_runner: StellarCoreRunner,
}
impl CaptiveCore {
pub fn new(config: IngestionConfig) -> Self {
generate_predefined_cfg(&config.context_path.0, config.network);
Self {
stellar_core_runner: StellarCoreRunner::new(config),
}
}
fn offline_replay_single_thread(&mut self, from: u32, to: u32) -> Result<(), Error> {
self.stellar_core_runner.catchup_single_thread(from, to)?;
Ok(())
}
fn offline_replay_multi_thread(
&mut self,
from: u32,
to: u32,
) -> Result<Receiver<Box<MetaResult>>, Error> {
Ok(self.stellar_core_runner.catchup_multi_thread(from, to)?)
}
pub fn prepare_ledgers_single_thread(&mut self, range: &Range) -> Result<(), Error> {
match range {
Range::Bounded(range) => {
self.offline_replay_single_thread(range.0, range.1)?;
}
};
Ok(())
}
pub fn prepare_ledgers_multi_thread(
&mut self,
range: &Range,
) -> Result<Receiver<Box<MetaResult>>, Error> {
let receiver = match range {
Range::Bounded(range) => self.offline_replay_multi_thread(range.0, range.1)?,
};
Ok(receiver)
}
pub fn close_runner_process(&mut self) -> Result<(), Error> {
if (self.stellar_core_runner.thread_mode())
== Some(&BufferedLedgerMetaReaderMode::SingleThread)
{
return Err(Error::CloseOnSingleThread);
}
Ok(self.stellar_core_runner.close_runner()?)
}
pub fn get_ledger(&self, sequence: u32) -> Result<LedgerCloseMeta, Error> {
let prepared = self.stellar_core_runner.read_prepared();
for ledger in prepared {
if let Some(wrapper) = ledger.ledger_close_meta {
let meta = wrapper.ledger_close_meta;
let ledger_seq = match meta.clone() {
LedgerCloseMeta::V1(v1) => v1.ledger_header.header.ledger_seq,
LedgerCloseMeta::V0(v0) => v0.ledger_header.header.ledger_seq,
};
if ledger_seq == sequence {
return Ok(meta);
}
}
}
Err(Error::LedgerNotFound)
}
pub async fn async_prepare_ledgers(&mut self, range: &Range, to_current: bool) -> Result<tokio::sync::mpsc::UnboundedReceiver<Box<MetaResult>>, Error> {
match range {
Range::Bounded(range) => {
self.stellar_core_runner.async_catchup_multi_thread(range.0, range.1, to_current).await.map_err(|runner| Error::Core(runner))
}
}
}
pub fn start_online_no_range(&mut self) -> Result<Receiver<Box<MetaResult>>, Error> {
Ok(self.stellar_core_runner.run()?)
}
pub async fn async_start_online_no_range(&mut self) -> Result<tokio::sync::mpsc::UnboundedReceiver<Box<MetaResult>>, Error> {
Ok(self.stellar_core_runner.run_async().await?)
}
}