chaincodec_batch/
engine.rs1use crate::request::BatchRequest;
4use chaincodec_core::{
5 decoder::ChainDecoder,
6 error::{BatchDecodeError, DecodeError},
7 event::DecodedEvent,
8 schema::SchemaRegistry,
9};
10use std::sync::Arc;
11use tracing::info;
12
13pub struct BatchResult {
15 pub events: Vec<DecodedEvent>,
17 pub errors: Vec<(usize, DecodeError)>,
19 pub total_input: usize,
21}
22
23pub struct BatchEngine {
25 registry: Arc<dyn SchemaRegistry>,
26 decoders: std::collections::HashMap<String, Arc<dyn ChainDecoder>>,
27}
28
29impl BatchEngine {
30 pub fn new(registry: Arc<dyn SchemaRegistry>) -> Self {
31 Self {
32 registry,
33 decoders: std::collections::HashMap::new(),
34 }
35 }
36
37 pub fn add_decoder(
39 &mut self,
40 chain_slug: impl Into<String>,
41 decoder: Arc<dyn ChainDecoder>,
42 ) {
43 self.decoders.insert(chain_slug.into(), decoder);
44 }
45
46 pub fn decode(&self, req: BatchRequest) -> Result<BatchResult, BatchDecodeError> {
48 let decoder = self.decoders.get(&req.chain).ok_or_else(|| {
49 BatchDecodeError::Other(format!("no decoder registered for chain '{}'", req.chain))
50 })?;
51
52 let total_input = req.logs.len();
53 info!(
54 "BatchEngine: decoding {} events for '{}' (chunk_size={})",
55 total_input, req.chain, req.chunk_size
56 );
57
58 let mut all_events: Vec<DecodedEvent> = Vec::with_capacity(total_input);
59 let mut all_errors: Vec<(usize, DecodeError)> = Vec::new();
60 let mut global_offset = 0usize;
61 let mut decoded_so_far = 0usize;
62
63 for chunk in req.logs.chunks(req.chunk_size) {
64 let result = decoder.decode_batch(
65 chunk,
66 self.registry.as_ref(),
67 req.error_mode,
68 req.on_progress.as_ref().map(|f| f.as_ref()),
69 )?;
70
71 decoded_so_far += result.events.len();
72
73 if let Some(cb) = &req.on_progress {
74 cb.on_progress(decoded_so_far, total_input);
75 }
76
77 all_events.extend(result.events);
78 for (local_idx, err) in result.errors {
79 all_errors.push((global_offset + local_idx, err));
80 }
81
82 global_offset += chunk.len();
83 }
84
85 info!(
86 "BatchEngine: complete — {} decoded, {} errors",
87 all_events.len(),
88 all_errors.len()
89 );
90
91 Ok(BatchResult {
92 events: all_events,
93 errors: all_errors,
94 total_input,
95 })
96 }
97}