use crate::chain::ChainFamily;
use crate::error::{BatchDecodeError, DecodeError};
use crate::event::{DecodedEvent, EventFingerprint, RawEvent};
use crate::schema::SchemaRegistry;
pub trait ProgressCallback: Send + Sync {
fn on_progress(&self, decoded: usize, total: usize);
}
impl<F: Fn(usize, usize) + Send + Sync> ProgressCallback for F {
fn on_progress(&self, decoded: usize, total: usize) {
self(decoded, total)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ErrorMode {
#[default]
Skip,
Collect,
Throw,
}
#[derive(Debug)]
pub struct BatchDecodeResult {
pub events: Vec<DecodedEvent>,
pub errors: Vec<(usize, DecodeError)>,
}
pub trait ChainDecoder: Send + Sync {
fn chain_family(&self) -> ChainFamily;
fn fingerprint(&self, raw: &RawEvent) -> EventFingerprint;
fn decode_event(
&self,
raw: &RawEvent,
schema: &crate::schema::Schema,
) -> Result<DecodedEvent, DecodeError>;
fn decode_batch(
&self,
logs: &[RawEvent],
registry: &dyn SchemaRegistry,
mode: ErrorMode,
progress: Option<&dyn ProgressCallback>,
) -> Result<BatchDecodeResult, BatchDecodeError> {
let mut events = Vec::with_capacity(logs.len());
let mut errors = Vec::new();
for (idx, raw) in logs.iter().enumerate() {
let fp = self.fingerprint(raw);
let schema = match registry.get_by_fingerprint(&fp) {
Some(s) => s,
None => {
let err = DecodeError::SchemaNotFound {
fingerprint: fp.to_string(),
};
match mode {
ErrorMode::Skip => {
if let Some(cb) = progress {
cb.on_progress(events.len(), logs.len());
}
continue;
}
ErrorMode::Collect => {
errors.push((idx, err));
if let Some(cb) = progress {
cb.on_progress(events.len(), logs.len());
}
continue;
}
ErrorMode::Throw => {
return Err(BatchDecodeError::ItemFailed {
index: idx,
source: err,
});
}
}
}
};
match self.decode_event(raw, &schema) {
Ok(event) => {
events.push(event);
}
Err(err) => match mode {
ErrorMode::Skip => {}
ErrorMode::Collect => errors.push((idx, err)),
ErrorMode::Throw => {
return Err(BatchDecodeError::ItemFailed {
index: idx,
source: err,
});
}
},
}
if let Some(cb) = progress {
cb.on_progress(events.len(), logs.len());
}
}
Ok(BatchDecodeResult { events, errors })
}
fn supports_abi_guess(&self) -> bool {
false
}
}