1use chaincodec_core::{
6 decoder::ErrorMode,
7 error::DecodeError,
8 event::{DecodedEvent, RawEvent},
9 schema::SchemaRegistry,
10};
11use rayon::prelude::*;
12
13use crate::decoder::EvmDecoder;
14use chaincodec_core::decoder::ChainDecoder;
15
16pub fn parallel_decode(
19 decoder: &EvmDecoder,
20 logs: &[RawEvent],
21 registry: &dyn SchemaRegistry,
22) -> (Vec<DecodedEvent>, Vec<(usize, DecodeError)>) {
23 let results: Vec<(usize, Result<DecodedEvent, DecodeError>)> = logs
24 .par_iter()
25 .enumerate()
26 .map(|(idx, raw)| {
27 let fp = decoder.fingerprint(raw);
28 match registry.get_by_fingerprint(&fp) {
29 None => (
30 idx,
31 Err(DecodeError::SchemaNotFound {
32 fingerprint: fp.to_string(),
33 }),
34 ),
35 Some(schema) => (idx, decoder.decode_event(raw, &schema)),
36 }
37 })
38 .collect();
39
40 let mut events = Vec::new();
41 let mut errors = Vec::new();
42 for (idx, r) in results {
43 match r {
44 Ok(e) => events.push(e),
45 Err(e) => errors.push((idx, e)),
46 }
47 }
48 (events, errors)
49}
50
51pub fn chunked_decode(
54 decoder: &EvmDecoder,
55 logs: &[RawEvent],
56 registry: &dyn SchemaRegistry,
57 chunk_size: usize,
58) -> (Vec<DecodedEvent>, Vec<(usize, DecodeError)>) {
59 let mut all_events = Vec::new();
60 let mut all_errors = Vec::new();
61 let mut offset = 0;
62
63 for chunk in logs.chunks(chunk_size) {
64 let (mut evts, errs) = parallel_decode(decoder, chunk, registry);
65 all_events.append(&mut evts);
66 for (idx, err) in errs {
68 all_errors.push((offset + idx, err));
69 }
70 offset += chunk.len();
71 }
72
73 (all_events, all_errors)
74}