Skip to main content

chaincodec_core/
decoder.rs

1//! The core `ChainDecoder` trait and associated progress/batch types.
2//!
3//! Every chain-specific decoder (EVM, Solana, Cosmos, etc.) implements
4//! `ChainDecoder`. The trait is object-safe so decoders can be stored as
5//! `Arc<dyn ChainDecoder>` in the streaming and batch engines.
6
7use crate::chain::ChainFamily;
8use crate::error::{BatchDecodeError, DecodeError};
9use crate::event::{DecodedEvent, EventFingerprint, RawEvent};
10use crate::schema::SchemaRegistry;
11
12/// Callback invoked by the batch engine during long-running decodes.
13/// `decoded` is the number of events successfully decoded so far;
14/// `total` is the total count in the current batch.
15pub trait ProgressCallback: Send + Sync {
16    fn on_progress(&self, decoded: usize, total: usize);
17}
18
19/// Blanket impl so closures can be used as progress callbacks.
20impl<F: Fn(usize, usize) + Send + Sync> ProgressCallback for F {
21    fn on_progress(&self, decoded: usize, total: usize) {
22        self(decoded, total)
23    }
24}
25
26/// Controls how the batch engine reacts to individual decode failures.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
28pub enum ErrorMode {
29    /// Silently skip events that fail to decode. Suitable for best-effort analytics.
30    #[default]
31    Skip,
32    /// Collect decode errors alongside successes and return both at the end.
33    Collect,
34    /// Abort the entire batch on first error.
35    Throw,
36}
37
38/// The output of a batch decode: successful events plus any collected errors.
39#[derive(Debug)]
40pub struct BatchDecodeResult {
41    pub events: Vec<DecodedEvent>,
42    /// Populated only when `ErrorMode::Collect` is used.
43    pub errors: Vec<(usize, DecodeError)>,
44}
45
46/// The central trait every chain-specific decoder must implement.
47///
48/// # Thread Safety
49/// Implementations must be `Send + Sync` so they can be shared across
50/// Tokio tasks and Rayon threads without additional locking.
51pub trait ChainDecoder: Send + Sync {
52    /// Returns the chain family this decoder handles.
53    fn chain_family(&self) -> ChainFamily;
54
55    /// Compute the event fingerprint from a raw event.
56    /// For EVM this is `topics[0]`; for Solana it's a discriminator hash, etc.
57    fn fingerprint(&self, raw: &RawEvent) -> EventFingerprint;
58
59    /// Decode a single raw event using the provided schema.
60    fn decode_event(
61        &self,
62        raw: &RawEvent,
63        schema: &crate::schema::Schema,
64    ) -> Result<DecodedEvent, DecodeError>;
65
66    /// Decode a batch of raw events.
67    ///
68    /// The default implementation calls `decode_event` for each log, but
69    /// chain-specific crates can override this for parallelism (Rayon) or
70    /// other optimizations.
71    fn decode_batch(
72        &self,
73        logs: &[RawEvent],
74        registry: &dyn SchemaRegistry,
75        mode: ErrorMode,
76        progress: Option<&dyn ProgressCallback>,
77    ) -> Result<BatchDecodeResult, BatchDecodeError> {
78        let mut events = Vec::with_capacity(logs.len());
79        let mut errors = Vec::new();
80
81        for (idx, raw) in logs.iter().enumerate() {
82            let fp = self.fingerprint(raw);
83            let schema = match registry.get_by_fingerprint(&fp) {
84                Some(s) => s,
85                None => {
86                    let err = DecodeError::SchemaNotFound {
87                        fingerprint: fp.to_string(),
88                    };
89                    match mode {
90                        ErrorMode::Skip => {
91                            if let Some(cb) = progress {
92                                cb.on_progress(events.len(), logs.len());
93                            }
94                            continue;
95                        }
96                        ErrorMode::Collect => {
97                            errors.push((idx, err));
98                            if let Some(cb) = progress {
99                                cb.on_progress(events.len(), logs.len());
100                            }
101                            continue;
102                        }
103                        ErrorMode::Throw => {
104                            return Err(BatchDecodeError::ItemFailed {
105                                index: idx,
106                                source: err,
107                            });
108                        }
109                    }
110                }
111            };
112
113            match self.decode_event(raw, &schema) {
114                Ok(event) => {
115                    events.push(event);
116                }
117                Err(err) => match mode {
118                    ErrorMode::Skip => {}
119                    ErrorMode::Collect => errors.push((idx, err)),
120                    ErrorMode::Throw => {
121                        return Err(BatchDecodeError::ItemFailed {
122                            index: idx,
123                            source: err,
124                        });
125                    }
126                },
127            }
128
129            if let Some(cb) = progress {
130                cb.on_progress(events.len(), logs.len());
131            }
132        }
133
134        Ok(BatchDecodeResult { events, errors })
135    }
136
137    /// Whether this decoder can attempt to guess/auto-detect a schema
138    /// from raw bytes when no schema is found in the registry.
139    fn supports_abi_guess(&self) -> bool {
140        false
141    }
142}