chaincodec_core/decoder.rs
1//! The core `ChainDecoder` trait and associated progress/batch types.
2//!
3//! Every chain-specific decoder (EVM, Solana, Cosmos, etc.) implements
4//! `ChainDecoder`. The trait is object-safe so decoders can be stored as
5//! `Arc<dyn ChainDecoder>` in the streaming and batch engines.
6
7use crate::chain::ChainFamily;
8use crate::error::{BatchDecodeError, DecodeError};
9use crate::event::{DecodedEvent, EventFingerprint, RawEvent};
10use crate::schema::SchemaRegistry;
11
12/// Callback invoked by the batch engine during long-running decodes.
13/// `decoded` is the number of events successfully decoded so far;
14/// `total` is the total count in the current batch.
15pub trait ProgressCallback: Send + Sync {
16 fn on_progress(&self, decoded: usize, total: usize);
17}
18
19/// Blanket impl so closures can be used as progress callbacks.
20impl<F: Fn(usize, usize) + Send + Sync> ProgressCallback for F {
21 fn on_progress(&self, decoded: usize, total: usize) {
22 self(decoded, total)
23 }
24}
25
26/// Controls how the batch engine reacts to individual decode failures.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
28pub enum ErrorMode {
29 /// Silently skip events that fail to decode. Suitable for best-effort analytics.
30 #[default]
31 Skip,
32 /// Collect decode errors alongside successes and return both at the end.
33 Collect,
34 /// Abort the entire batch on first error.
35 Throw,
36}
37
38/// The output of a batch decode: successful events plus any collected errors.
39#[derive(Debug)]
40pub struct BatchDecodeResult {
41 pub events: Vec<DecodedEvent>,
42 /// Populated only when `ErrorMode::Collect` is used.
43 pub errors: Vec<(usize, DecodeError)>,
44}
45
46/// The central trait every chain-specific decoder must implement.
47///
48/// # Thread Safety
49/// Implementations must be `Send + Sync` so they can be shared across
50/// Tokio tasks and Rayon threads without additional locking.
51pub trait ChainDecoder: Send + Sync {
52 /// Returns the chain family this decoder handles.
53 fn chain_family(&self) -> ChainFamily;
54
55 /// Compute the event fingerprint from a raw event.
56 /// For EVM this is `topics[0]`; for Solana it's a discriminator hash, etc.
57 fn fingerprint(&self, raw: &RawEvent) -> EventFingerprint;
58
59 /// Decode a single raw event using the provided schema.
60 fn decode_event(
61 &self,
62 raw: &RawEvent,
63 schema: &crate::schema::Schema,
64 ) -> Result<DecodedEvent, DecodeError>;
65
66 /// Decode a batch of raw events.
67 ///
68 /// The default implementation calls `decode_event` for each log, but
69 /// chain-specific crates can override this for parallelism (Rayon) or
70 /// other optimizations.
71 fn decode_batch(
72 &self,
73 logs: &[RawEvent],
74 registry: &dyn SchemaRegistry,
75 mode: ErrorMode,
76 progress: Option<&dyn ProgressCallback>,
77 ) -> Result<BatchDecodeResult, BatchDecodeError> {
78 let mut events = Vec::with_capacity(logs.len());
79 let mut errors = Vec::new();
80
81 for (idx, raw) in logs.iter().enumerate() {
82 let fp = self.fingerprint(raw);
83 let schema = match registry.get_by_fingerprint(&fp) {
84 Some(s) => s,
85 None => {
86 let err = DecodeError::SchemaNotFound {
87 fingerprint: fp.to_string(),
88 };
89 match mode {
90 ErrorMode::Skip => {
91 if let Some(cb) = progress {
92 cb.on_progress(events.len(), logs.len());
93 }
94 continue;
95 }
96 ErrorMode::Collect => {
97 errors.push((idx, err));
98 if let Some(cb) = progress {
99 cb.on_progress(events.len(), logs.len());
100 }
101 continue;
102 }
103 ErrorMode::Throw => {
104 return Err(BatchDecodeError::ItemFailed {
105 index: idx,
106 source: err,
107 });
108 }
109 }
110 }
111 };
112
113 match self.decode_event(raw, &schema) {
114 Ok(event) => {
115 events.push(event);
116 }
117 Err(err) => match mode {
118 ErrorMode::Skip => {}
119 ErrorMode::Collect => errors.push((idx, err)),
120 ErrorMode::Throw => {
121 return Err(BatchDecodeError::ItemFailed {
122 index: idx,
123 source: err,
124 });
125 }
126 },
127 }
128
129 if let Some(cb) = progress {
130 cb.on_progress(events.len(), logs.len());
131 }
132 }
133
134 Ok(BatchDecodeResult { events, errors })
135 }
136
137 /// Whether this decoder can attempt to guess/auto-detect a schema
138 /// from raw bytes when no schema is found in the registry.
139 fn supports_abi_guess(&self) -> bool {
140 false
141 }
142}