use std::fmt::{self, Debug};
use vixen_core::{instruction::InstructionUpdate, GetPrefilter, ParserId, TransactionUpdate};
use crate::handler::{BoxPipeline, DynPipeline, PipelineErrors};
#[cfg(feature = "prometheus")]
use crate::metrics;
pub struct InstructionPipeline(Box<[BoxPipeline<'static, InstructionUpdate>]>);
impl fmt::Debug for InstructionPipeline {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("InstructionPipeline").field(&self.0).finish()
}
}
impl InstructionPipeline {
#[must_use]
pub fn new(pipelines: Vec<BoxPipeline<'static, InstructionUpdate>>) -> Option<Self> {
if pipelines.is_empty() {
return None;
}
Some(Self(pipelines.into_boxed_slice()))
}
pub async fn handle(&self, txn: &TransactionUpdate) -> Result<(), PipelineErrors> {
let mut err = None;
let ixs = InstructionUpdate::build_from_txn(txn).map_err(PipelineErrors::parse)?;
for insn in ixs.iter().flat_map(|i| i.visit_all()) {
for pipe in &*self.0 {
let res = pipe.handle(insn).await;
#[cfg(feature = "prometheus")]
metrics::increment_processed_updates(&res, metrics::UpdateType::Instruction);
match res {
Ok(()) => (),
Err(PipelineErrors::AlreadyHandled(h)) => h.as_unit(),
Err(e) => err = Some(e.handle::<InstructionUpdate>(&pipe.id())),
}
}
}
if let Some(h) = err {
Err(PipelineErrors::AlreadyHandled(h))
} else {
Ok(())
}
}
}
impl ParserId for InstructionPipeline {
fn id(&self) -> std::borrow::Cow<'static, str> { "InstructionPipeline".into() }
}
impl GetPrefilter for InstructionPipeline {
fn prefilter(&self) -> vixen_core::Prefilter {
self.0.iter().map(GetPrefilter::prefilter).collect()
}
}
impl DynPipeline<TransactionUpdate> for InstructionPipeline {
fn handle<'h>(
&'h self,
value: &'h TransactionUpdate,
) -> std::pin::Pin<Box<dyn futures_util::Future<Output = Result<(), PipelineErrors>> + Send + 'h>>
{
Box::pin(InstructionPipeline::handle(self, value))
}
}
pub struct SingleInstructionPipeline(BoxPipeline<'static, InstructionUpdate>);
impl SingleInstructionPipeline {
#[must_use]
pub fn new(pipeline: BoxPipeline<'static, InstructionUpdate>) -> Self { Self(pipeline) }
pub async fn handle(&self, txn: &TransactionUpdate) -> Result<(), PipelineErrors> {
let ixs = InstructionUpdate::build_from_txn(txn).map_err(PipelineErrors::parse)?;
let pipe = &self.0;
for insn in ixs.iter().flat_map(|i| i.visit_all()) {
let res = pipe.handle(insn).await;
#[cfg(feature = "prometheus")]
metrics::increment_processed_updates(&res, metrics::UpdateType::Instruction);
match res {
Ok(()) => (),
Err(PipelineErrors::AlreadyHandled(h)) => h.as_unit(),
Err(e) => {
let handled = e.handle::<InstructionUpdate>(&pipe.id());
return Err(PipelineErrors::AlreadyHandled(handled));
},
}
}
Ok(())
}
}
impl ParserId for SingleInstructionPipeline {
fn id(&self) -> std::borrow::Cow<'static, str> { self.0.id() }
}
impl GetPrefilter for SingleInstructionPipeline {
fn prefilter(&self) -> vixen_core::Prefilter { self.0.prefilter() }
}
impl Debug for SingleInstructionPipeline {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("SingleInstructionPipeline")
.field(&self.0)
.finish()
}
}
impl DynPipeline<TransactionUpdate> for SingleInstructionPipeline {
fn handle<'h>(
&'h self,
value: &'h TransactionUpdate,
) -> std::pin::Pin<Box<dyn futures_util::Future<Output = Result<(), PipelineErrors>> + Send + 'h>>
{
Box::pin(SingleInstructionPipeline::handle(self, value))
}
}