Skip to main content

chaincodec_batch/
engine.rs

1//! `BatchEngine` — orchestrates chunked, parallel batch decoding.
2
3use crate::request::BatchRequest;
4use chaincodec_core::{
5    decoder::ChainDecoder,
6    error::{BatchDecodeError, DecodeError},
7    event::DecodedEvent,
8    schema::SchemaRegistry,
9};
10use std::sync::Arc;
11use tracing::info;
12
13/// Result of a batch decode job.
14pub struct BatchResult {
15    /// Successfully decoded events
16    pub events: Vec<DecodedEvent>,
17    /// (original_index, error) pairs — only populated in Collect mode
18    pub errors: Vec<(usize, DecodeError)>,
19    /// Total raw events processed
20    pub total_input: usize,
21}
22
23/// Batch decode engine.
24pub struct BatchEngine {
25    registry: Arc<dyn SchemaRegistry>,
26    decoders: std::collections::HashMap<String, Arc<dyn ChainDecoder>>,
27}
28
29impl BatchEngine {
30    pub fn new(registry: Arc<dyn SchemaRegistry>) -> Self {
31        Self {
32            registry,
33            decoders: std::collections::HashMap::new(),
34        }
35    }
36
37    /// Register a decoder for a given chain slug.
38    pub fn add_decoder(
39        &mut self,
40        chain_slug: impl Into<String>,
41        decoder: Arc<dyn ChainDecoder>,
42    ) {
43        self.decoders.insert(chain_slug.into(), decoder);
44    }
45
46    /// Execute a batch decode request.
47    pub fn decode(&self, req: BatchRequest) -> Result<BatchResult, BatchDecodeError> {
48        let decoder = self.decoders.get(&req.chain).ok_or_else(|| {
49            BatchDecodeError::Other(format!("no decoder registered for chain '{}'", req.chain))
50        })?;
51
52        let total_input = req.logs.len();
53        info!(
54            "BatchEngine: decoding {} events for '{}' (chunk_size={})",
55            total_input, req.chain, req.chunk_size
56        );
57
58        let mut all_events: Vec<DecodedEvent> = Vec::with_capacity(total_input);
59        let mut all_errors: Vec<(usize, DecodeError)> = Vec::new();
60        let mut global_offset = 0usize;
61        let mut decoded_so_far = 0usize;
62
63        for chunk in req.logs.chunks(req.chunk_size) {
64            let result = decoder.decode_batch(
65                chunk,
66                self.registry.as_ref(),
67                req.error_mode,
68                req.on_progress.as_ref().map(|f| f.as_ref()),
69            )?;
70
71            decoded_so_far += result.events.len();
72
73            if let Some(cb) = &req.on_progress {
74                cb.on_progress(decoded_so_far, total_input);
75            }
76
77            all_events.extend(result.events);
78            for (local_idx, err) in result.errors {
79                all_errors.push((global_offset + local_idx, err));
80            }
81
82            global_offset += chunk.len();
83        }
84
85        info!(
86            "BatchEngine: complete — {} decoded, {} errors",
87            all_events.len(),
88            all_errors.len()
89        );
90
91        Ok(BatchResult {
92            events: all_events,
93            errors: all_errors,
94            total_input,
95        })
96    }
97}