use std::{borrow::Cow, collections::HashMap, pin::Pin};
use futures_util::{Future, FutureExt, StreamExt};
use smallvec::SmallVec;
use tracing::{trace, Instrument, Span};
use vixen_core::{
AccountUpdate, BlockMetaUpdate, BlockUpdate, GetPrefilter, ParserId, SlotUpdate,
TransactionUpdate,
};
use yellowstone_vixen_core::{Filters, ParseError, Parser, Prefilter};
#[cfg(feature = "prometheus")]
use crate::metrics;
type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type HandlerResult<T> = Result<T, BoxedError>;
pub trait Handler<T, R>
where R: Sync
{
fn handle(&self, value: &T, raw_event: &R) -> impl Future<Output = HandlerResult<()>> + Send;
}
impl<T: Handler<U, R>, U, R> Handler<U, R> for &T
where R: Sync
{
#[inline]
fn handle(&self, value: &U, raw_event: &R) -> impl Future<Output = HandlerResult<()>> + Send {
<T as Handler<U, R>>::handle(self, value, raw_event)
}
}
pub(crate) use pipeline_error::Errors as PipelineErrors;
mod pipeline_error {
use smallvec::SmallVec;
use super::BoxedError;
#[derive(Debug, Clone, Copy)]
#[must_use]
pub struct Handled(());
impl Handled {
#[inline]
pub fn as_unit(self) { let Self(()) = self; }
}
#[derive(Debug)]
pub enum Errors {
Parse(BoxedError),
Handlers(SmallVec<[BoxedError; 1]>),
AlreadyHandled(Handled),
}
impl Errors {
#[inline]
#[must_use]
pub fn parse<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self::Parse(Box::new(e))
}
pub fn handle<T>(self, handler: &str) -> Handled {
for e in self {
tracing::error!(
err = %crate::Chain(&e),
handler,
r#type = std::any::type_name::<T>(),
"Handler failed",
);
}
Handled(())
}
}
impl IntoIterator for Errors {
type IntoIter = IntoIter;
type Item = Error;
fn into_iter(self) -> Self::IntoIter {
match self {
Errors::Parse(e) => IntoIter::Parse([e].into_iter()),
Errors::Handlers(v) => IntoIter::Handlers(v.into_iter()),
Errors::AlreadyHandled(Handled(())) => IntoIter::AlreadyHandled,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Error parsing input value: ({0})")]
Parser(#[source] BoxedError),
#[error("Handler returned an error on parsed value: ({0})")]
Handler(#[source] BoxedError),
}
#[derive(Debug)]
pub enum IntoIter {
Parse(std::array::IntoIter<BoxedError, 1>),
Handlers(smallvec::IntoIter<[BoxedError; 1]>),
AlreadyHandled,
}
impl Iterator for IntoIter {
type Item = Error;
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Parse(o) => o.next().map(Error::Parser),
Self::Handlers(v) => v.next().map(Error::Handler),
Self::AlreadyHandled => None,
}
}
}
}
#[derive(Debug)]
pub struct Pipeline<P, H>(P, H);
impl<P, H> Pipeline<P, H> {
#[inline]
#[must_use]
pub fn new(parser: P, handlers: H) -> Self { Self(parser, handlers) }
}
impl<P: ParserId, H> ParserId for Pipeline<P, H> {
#[inline]
fn id(&self) -> Cow<'static, str> { self.0.id() }
}
impl<P: GetPrefilter, H> GetPrefilter for Pipeline<P, H> {
#[inline]
fn prefilter(&self) -> Prefilter { self.0.prefilter() }
}
pub type BoxPipeline<'h, T> = Box<dyn DynPipeline<T> + Send + Sync + 'h>;
impl<P, I> Pipeline<P, I>
where
for<'i> &'i I: IntoIterator,
P: Parser,
P::Input: Sync,
for<'i> <&'i I as IntoIterator>::Item: Handler<P::Output, P::Input>,
{
pub async fn handle(&self, value: &P::Input) -> Result<(), PipelineErrors> {
let parsed = match self
.0
.parse(value)
.instrument(tracing::info_span!("vixen.parse",))
.await
{
Ok(p) => p,
Err(ParseError::Filtered) => return Ok(()),
Err(ParseError::DiscriminatorNotFound(msg)) => {
return Err(PipelineErrors::Parse(msg.into()));
},
Err(ParseError::Other(e)) => return Err(PipelineErrors::Parse(e)),
};
let parsed = &parsed;
let errs = (&self.1)
.into_iter()
.map(|h| async move {
h.handle(parsed, value)
.instrument(tracing::info_span!("vixen.handle",))
.await
})
.collect::<futures_util::stream::FuturesUnordered<_>>()
.filter_map(|r| async move { r.err() })
.collect::<SmallVec<[_; 1]>>()
.await;
if errs.is_empty() {
Ok(())
} else {
Err(PipelineErrors::Handlers(errs))
}
}
}
pub trait DynPipeline<T>: std::fmt::Debug + ParserId + GetPrefilter {
fn handle<'h>(
&'h self,
value: &'h T,
) -> Pin<Box<dyn Future<Output = Result<(), PipelineErrors>> + Send + 'h>>;
}
impl<T> DynPipeline<T> for std::convert::Infallible {
fn handle<'h>(
&'h self,
_: &'h T,
) -> Pin<Box<dyn Future<Output = Result<(), PipelineErrors>> + Send + 'h>> {
match *self {}
}
}
impl<P: std::fmt::Debug + Parser + Sync, I: std::fmt::Debug + Sync> DynPipeline<P::Input>
for Pipeline<P, I>
where
for<'i> &'i I: IntoIterator,
P::Input: Sync,
P::Output: Send + Sync,
for<'i> <&'i I as IntoIterator>::Item: Handler<P::Output, P::Input> + Send,
{
fn handle<'h>(
&'h self,
value: &'h P::Input,
) -> Pin<Box<dyn Future<Output = Result<(), PipelineErrors>> + Send + 'h>> {
Box::pin(Pipeline::handle(self, value))
}
}
impl<T> ParserId for BoxPipeline<'_, T> {
fn id(&self) -> Cow<'static, str> { <dyn DynPipeline<T>>::id(&**self) }
}
impl<T> GetPrefilter for BoxPipeline<'_, T> {
#[inline]
fn prefilter(&self) -> Prefilter { <dyn DynPipeline<T>>::prefilter(&**self) }
}
impl<T> DynPipeline<T> for BoxPipeline<'_, T> {
#[inline]
fn handle<'h>(
&'h self,
value: &'h T,
) -> Pin<Box<dyn Future<Output = Result<(), PipelineErrors>> + Send + 'h>> {
<dyn DynPipeline<T>>::handle(&**self, value)
}
}
#[derive(Debug)]
pub(crate) struct PipelineSets {
pub account: PipelineSet<BoxPipeline<'static, AccountUpdate>>,
pub transaction: PipelineSet<BoxPipeline<'static, TransactionUpdate>>,
pub instruction: PipelineSet<BoxPipeline<'static, TransactionUpdate>>,
pub block_meta: PipelineSet<BoxPipeline<'static, BlockMetaUpdate>>,
pub block: PipelineSet<BoxPipeline<'static, BlockUpdate>>,
pub slot: PipelineSet<BoxPipeline<'static, SlotUpdate>>,
}
impl PipelineSets {
#[must_use]
pub fn filters(&self) -> Filters {
Filters::new(
self.account
.filters()
.chain(self.transaction.filters())
.chain(self.instruction.filters())
.chain(self.block_meta.filters())
.chain(self.block.filters())
.chain(self.slot.filters())
.collect(),
)
}
}
#[derive(Debug)]
pub(crate) struct PipelineSet<P>(HashMap<String, P>);
impl<P> PipelineSet<P> {
#[inline]
#[must_use]
pub fn len(&self) -> usize { self.0.len() }
#[inline]
#[must_use]
pub fn new() -> Self { Self(HashMap::new()) }
#[inline]
pub fn insert(&mut self, key: String, value: P) -> Option<P> { self.0.insert(key, value) }
}
impl<P: GetPrefilter> PipelineSet<P> {
#[inline]
fn filters(&self) -> impl Iterator<Item = (String, Prefilter)> {
self.0
.iter()
.map(|(k, v)| (k.clone(), v.prefilter()))
.collect::<Vec<_>>()
.into_iter()
}
}
impl<P> PipelineSet<P> {
pub(crate) fn get_handlers<I>(&'_ self, it: I) -> Pipelines<'_, P, I> { Pipelines(self, it) }
}
impl<P: ParserId> FromIterator<P> for PipelineSet<P> {
fn from_iter<I: IntoIterator<Item = P>>(iter: I) -> Self {
Self(iter.into_iter().map(|i| (i.id().into_owned(), i)).collect())
}
}
#[derive(Debug)]
pub(crate) struct Pipelines<'m, H, I>(&'m PipelineSet<H>, I);
impl<'m, H, I: IntoIterator> Pipelines<'m, H, I>
where I::Item: AsRef<str> + Send + 'm
{
fn get_pipelines(self) -> impl Iterator<Item = (I::Item, &'m H)> {
let Self(pipelines, it) = self;
it.into_iter().filter_map(|f| {
let filter = f.as_ref();
let pipeline = pipelines.0.get(filter);
if pipeline.is_none() {
trace!(filter, "No pipeline matched filter on incoming update");
}
pipeline.map(|p| (f, p))
})
}
pub fn run<'h, T>(
self,
span: Span,
value: &'h T,
#[cfg(feature = "prometheus")] update_type: metrics::UpdateType,
) -> impl Future<Output = ()> + Send + 'h
where
H: DynPipeline<T>,
'm: 'h,
{
let _span = span.entered();
futures_util::future::join_all(self.get_pipelines().map(move |(f, h)| {
h.handle(value)
.map(move |r| {
#[cfg(feature = "prometheus")]
metrics::increment_processed_updates(&r, update_type);
match r {
Ok(()) => (),
Err(v) => v.handle::<T>(f.as_ref()).as_unit(),
}
})
.in_current_span()
}))
.map(move |v| v.into_iter().collect())
}
}