Skip to main content

chaincodec_evm/
batch.rs

1//! Rayon-powered batch decode helpers specific to the EVM decoder.
2//! The main batch logic lives in `EvmDecoder::decode_batch`, but this module
3//! exposes chunk-level utilities for the higher-level batch engine.
4
5use chaincodec_core::{
6    decoder::ErrorMode,
7    error::DecodeError,
8    event::{DecodedEvent, RawEvent},
9    schema::SchemaRegistry,
10};
11use rayon::prelude::*;
12
13use crate::decoder::EvmDecoder;
14use chaincodec_core::decoder::ChainDecoder;
15
16/// Decode a slice of EVM raw events in parallel using Rayon.
17/// Returns `(successes, errors)`.
18pub fn parallel_decode(
19    decoder: &EvmDecoder,
20    logs: &[RawEvent],
21    registry: &dyn SchemaRegistry,
22) -> (Vec<DecodedEvent>, Vec<(usize, DecodeError)>) {
23    let results: Vec<(usize, Result<DecodedEvent, DecodeError>)> = logs
24        .par_iter()
25        .enumerate()
26        .map(|(idx, raw)| {
27            let fp = decoder.fingerprint(raw);
28            match registry.get_by_fingerprint(&fp) {
29                None => (
30                    idx,
31                    Err(DecodeError::SchemaNotFound {
32                        fingerprint: fp.to_string(),
33                    }),
34                ),
35                Some(schema) => (idx, decoder.decode_event(raw, &schema)),
36            }
37        })
38        .collect();
39
40    let mut events = Vec::new();
41    let mut errors = Vec::new();
42    for (idx, r) in results {
43        match r {
44            Ok(e) => events.push(e),
45            Err(e) => errors.push((idx, e)),
46        }
47    }
48    (events, errors)
49}
50
51/// Chunk `logs` into slices of at most `chunk_size` and decode each chunk
52/// in parallel. Returns a flat list of successes and errors.
53pub fn chunked_decode(
54    decoder: &EvmDecoder,
55    logs: &[RawEvent],
56    registry: &dyn SchemaRegistry,
57    chunk_size: usize,
58) -> (Vec<DecodedEvent>, Vec<(usize, DecodeError)>) {
59    let mut all_events = Vec::new();
60    let mut all_errors = Vec::new();
61    let mut offset = 0;
62
63    for chunk in logs.chunks(chunk_size) {
64        let (mut evts, errs) = parallel_decode(decoder, chunk, registry);
65        all_events.append(&mut evts);
66        // Adjust error indices relative to overall slice
67        for (idx, err) in errs {
68            all_errors.push((offset + idx, err));
69        }
70        offset += chunk.len();
71    }
72
73    (all_events, all_errors)
74}