use vixen_core::{
instruction::InstructionUpdate, AccountUpdate, BlockMetaUpdate, BlockUpdate, SlotUpdate,
TransactionUpdate,
};
use crate::{
config::VixenConfig,
handler::{BoxPipeline, DynPipeline, PipelineSet, PipelineSets},
instruction::SingleInstructionPipeline,
sources::SourceTrait,
util, Runtime,
};
pub trait BuilderKind: Default {
type Error: std::error::Error;
}
#[derive(Debug, thiserror::Error)]
pub enum BuilderError {
#[error("ID collision detected among account pipelines")]
AccountPipelineCollision,
#[error("ID collision detected among transaction pipelines")]
TransactionPipelineCollision,
#[error("ID collision detected among slot pipelines")]
SlotPipelineCollision,
#[error("ID collision detected among block meta pipelines")]
BlockMetaCollision,
#[error("ID collision detected among block pipelines")]
BlockCollision,
#[error("Missing field {0:?}")]
MissingField(&'static str),
#[error("Missing config section {0:?}")]
MissingConfig(&'static str),
#[error("Error instantiating metrics backend")]
Metrics(#[source] Box<dyn std::error::Error>),
}
#[derive(Debug)]
#[must_use = "Consider calling .build() on this builder"]
pub struct Builder<K: BuilderKind, S: SourceTrait> {
pub err: Result<(), K::Error>,
pub account: Vec<BoxPipeline<'static, AccountUpdate>>,
pub transaction: Vec<BoxPipeline<'static, TransactionUpdate>>,
pub instruction: Vec<BoxPipeline<'static, InstructionUpdate>>,
pub block_meta: Vec<BoxPipeline<'static, BlockMetaUpdate>>,
pub block: Vec<BoxPipeline<'static, BlockUpdate>>,
pub slot: Vec<BoxPipeline<'static, SlotUpdate>>,
#[cfg(feature = "prometheus")]
pub metrics_registry: prometheus::Registry,
pub extra: K,
pub _source: std::marker::PhantomData<S>,
}
impl<K: BuilderKind, S: SourceTrait> Default for Builder<K, S> {
fn default() -> Self {
Self {
err: Ok(()),
account: vec![],
transaction: vec![],
instruction: vec![],
block_meta: vec![],
block: vec![],
slot: vec![],
extra: K::default(),
_source: std::marker::PhantomData,
#[cfg(feature = "prometheus")]
metrics_registry: prometheus::Registry::new(),
}
}
}
impl<K: BuilderKind, S: SourceTrait> Builder<K, S> {
#[inline]
pub fn mutate(self, mutate: impl FnOnce(&mut Self)) -> Self {
self.try_mutate(|s| {
mutate(s);
Ok(())
})
}
#[inline]
pub fn try_mutate(mut self, mutate: impl FnOnce(&mut Self) -> Result<(), K::Error>) -> Self {
if let Ok(()) = self.err {
self.err = mutate(&mut self);
}
self
}
#[cfg(feature = "prometheus")]
pub fn metrics(self, metrics_registry: prometheus::Registry) -> Builder<K, S> {
self.mutate(|s| s.metrics_registry = metrics_registry)
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct RuntimeKind;
pub type RuntimeBuilder<S> = Builder<RuntimeKind, S>;
impl BuilderKind for RuntimeKind {
type Error = BuilderError;
}
impl<S: SourceTrait> RuntimeBuilder<S> {
pub fn account<A: DynPipeline<AccountUpdate> + Send + Sync + 'static>(
self,
account: A,
) -> Self {
self.mutate(|s| s.account.push(Box::new(account)))
}
pub fn transaction<T: DynPipeline<TransactionUpdate> + Send + Sync + 'static>(
self,
transaction: T,
) -> Self {
self.mutate(|s| s.transaction.push(Box::new(transaction)))
}
pub fn instruction<I: DynPipeline<InstructionUpdate> + Send + Sync + 'static>(
self,
instruction: I,
) -> Self {
self.mutate(|s| s.instruction.push(Box::new(instruction)))
}
pub fn block_meta<T: DynPipeline<BlockMetaUpdate> + Send + Sync + 'static>(
self,
block_meta: T,
) -> Self {
self.mutate(|s| s.block_meta.push(Box::new(block_meta)))
}
pub fn block<T: DynPipeline<BlockUpdate> + Send + Sync + 'static>(self, block: T) -> Self {
self.mutate(|s| s.block.push(Box::new(block)))
}
pub fn slot<T: DynPipeline<SlotUpdate> + Send + Sync + 'static>(self, slot: T) -> Self {
self.mutate(|s| s.slot.push(Box::new(slot)))
}
pub fn try_build(self, config: VixenConfig<S::Config>) -> Result<Runtime<S>, BuilderError> {
let Self {
err,
account,
transaction,
instruction,
block_meta,
block,
slot,
extra: RuntimeKind,
_source,
#[cfg(feature = "prometheus")]
metrics_registry,
} = self;
let () = err?;
let VixenConfig {
source: source_cfg,
buffer: buffer_cfg,
} = config;
let mut ixs = PipelineSet::new();
for ix in instruction {
let id = ix.id().into_owned();
let pre_existent_parser = ixs.insert(
id.clone(),
Box::new(SingleInstructionPipeline::new(ix))
as BoxPipeline<'static, TransactionUpdate>,
);
if pre_existent_parser.is_some() {
tracing::warn!("Duplicate parser ID detected: {}", id);
}
}
let account_len = account.len();
let transaction_len = transaction.len();
let block_meta_len = block_meta.len();
let block_len = block.len();
let slot_len = slot.len();
let pipelines = PipelineSets {
account: account.into_iter().collect(),
transaction: transaction.into_iter().collect(),
instruction: ixs,
block_meta: block_meta.into_iter().collect(),
block: block.into_iter().collect(),
slot: slot.into_iter().collect(),
};
if pipelines.account.len() != account_len {
return Err(BuilderError::AccountPipelineCollision);
}
if pipelines.transaction.len() != transaction_len {
return Err(BuilderError::TransactionPipelineCollision);
}
if pipelines.block_meta.len() != block_meta_len {
return Err(BuilderError::BlockMetaCollision);
}
if pipelines.block.len() != block_len {
return Err(BuilderError::BlockCollision);
}
if pipelines.slot.len() != slot_len {
return Err(BuilderError::SlotPipelineCollision);
}
Ok(Runtime {
buffer: buffer_cfg,
source: source_cfg,
pipelines,
_source: std::marker::PhantomData,
#[cfg(feature = "prometheus")]
metrics_registry,
})
}
#[inline]
#[must_use]
pub fn build(self, config: VixenConfig<S::Config>) -> Runtime<S> {
util::handle_fatal_msg(self.try_build(config), "Error building Vixen runtime")
}
}