Skip to main content

chaincodec_stream/
engine.rs

1//! `StreamEngine` — orchestrates multiple chain listeners and decoder workers.
2
3use crate::config::StreamConfig;
4use crate::listener::BlockListener;
5use chaincodec_core::{
6    decoder::ChainDecoder,
7    error::StreamError,
8    event::DecodedEvent,
9    schema::SchemaRegistry,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::broadcast;
14use tracing::{error, info, warn};
15
16/// Metrics snapshot for the stream engine.
17#[derive(Debug, Clone, Default)]
18pub struct StreamMetrics {
19    pub events_decoded: u64,
20    pub events_skipped: u64,
21    pub decode_errors: u64,
22    pub reconnections: u64,
23}
24
25/// The top-level streaming engine.
26///
27/// # Usage
28/// ```no_run
29/// # async fn example() {
30/// use chaincodec_stream::{StreamEngine, StreamConfig};
31/// // ... build engine, call run(), receive from subscriber ...
32/// # }
33/// ```
34pub struct StreamEngine {
35    config: StreamConfig,
36    listeners: HashMap<String, Arc<dyn BlockListener>>,
37    registry: Arc<dyn SchemaRegistry>,
38    decoders: HashMap<String, Arc<dyn ChainDecoder>>,
39    tx: broadcast::Sender<DecodedEvent>,
40    metrics: Arc<std::sync::Mutex<StreamMetrics>>,
41}
42
43impl StreamEngine {
44    /// Create a new `StreamEngine`.
45    pub fn new(
46        config: StreamConfig,
47        registry: Arc<dyn SchemaRegistry>,
48    ) -> (Self, broadcast::Receiver<DecodedEvent>) {
49        let (tx, rx) = broadcast::channel(config.channel_capacity);
50        let engine = Self {
51            config,
52            listeners: HashMap::new(),
53            registry,
54            decoders: HashMap::new(),
55            tx,
56            metrics: Arc::new(std::sync::Mutex::new(StreamMetrics::default())),
57        };
58        (engine, rx)
59    }
60
61    /// Register a chain listener.
62    pub fn add_listener(&mut self, listener: Arc<dyn BlockListener>) {
63        self.listeners
64            .insert(listener.chain_slug().to_string(), listener);
65    }
66
67    /// Register a chain decoder.
68    pub fn add_decoder(&mut self, chain_slug: impl Into<String>, decoder: Arc<dyn ChainDecoder>) {
69        self.decoders.insert(chain_slug.into(), decoder);
70    }
71
72    /// Subscribe to the decoded event stream.
73    /// Call before `run()` to avoid missing events.
74    pub fn subscribe(&self) -> broadcast::Receiver<DecodedEvent> {
75        self.tx.subscribe()
76    }
77
78    /// Returns a snapshot of current metrics.
79    pub fn metrics(&self) -> StreamMetrics {
80        self.metrics.lock().unwrap().clone()
81    }
82
83    /// Start the engine. Spawns one Tokio task per chain listener.
84    /// This method returns immediately; listeners run in the background.
85    pub async fn run(self: Arc<Self>) {
86        info!("StreamEngine starting with {} chains", self.listeners.len());
87
88        for (chain_slug, listener) in &self.listeners {
89            let chain_slug = chain_slug.clone();
90            let listener = Arc::clone(listener);
91            let engine = Arc::clone(&self);
92
93            tokio::spawn(async move {
94                engine.run_listener(chain_slug, listener).await;
95            });
96        }
97    }
98
99    async fn run_listener(
100        &self,
101        chain_slug: String,
102        listener: Arc<dyn BlockListener>,
103    ) {
104        use futures::StreamExt;
105
106        let mut retry = 0u32;
107        loop {
108            info!("Connecting listener for chain: {}", chain_slug);
109            match listener.subscribe().await {
110                Err(e) => {
111                    error!("Listener connect error [{chain_slug}]: {e}");
112                    retry += 1;
113                    self.metrics.lock().unwrap().reconnections += 1;
114                    tokio::time::sleep(std::time::Duration::from_millis(500 * 2u64.pow(retry.min(6)))).await;
115                    continue;
116                }
117                Ok(mut stream) => {
118                    retry = 0;
119                    while let Some(item) = stream.next().await {
120                        match item {
121                            Err(e) => {
122                                warn!("Stream error [{chain_slug}]: {e}");
123                                break; // Reconnect
124                            }
125                            Ok(raw) => {
126                                self.process_raw_event(raw).await;
127                            }
128                        }
129                    }
130                    info!("Stream closed for [{chain_slug}], reconnecting...");
131                    self.metrics.lock().unwrap().reconnections += 1;
132                }
133            }
134        }
135    }
136
137    async fn process_raw_event(&self, raw: chaincodec_core::event::RawEvent) {
138        let chain_slug = raw.chain.slug.clone();
139        let decoder = match self.decoders.get(&chain_slug) {
140            Some(d) => d,
141            None => {
142                warn!("No decoder registered for chain: {}", chain_slug);
143                self.metrics.lock().unwrap().events_skipped += 1;
144                return;
145            }
146        };
147
148        let fp = decoder.fingerprint(&raw);
149        let schema = match self.registry.get_by_fingerprint(&fp) {
150            Some(s) => s,
151            None => {
152                if !self.config.skip_unknown {
153                    warn!("Unknown schema for fingerprint: {}", fp);
154                }
155                self.metrics.lock().unwrap().events_skipped += 1;
156                return;
157            }
158        };
159
160        // Filter by subscribed schemas if configured
161        if !self.config.schemas.is_empty() && !self.config.schemas.contains(&schema.name) {
162            self.metrics.lock().unwrap().events_skipped += 1;
163            return;
164        }
165
166        match decoder.decode_event(&raw, &schema) {
167            Ok(event) => {
168                self.metrics.lock().unwrap().events_decoded += 1;
169                if let Err(e) = self.tx.send(event) {
170                    // Receiver dropped — not a fatal error
171                    warn!("No active subscribers: {e}");
172                }
173            }
174            Err(e) => {
175                error!("Decode error [{chain_slug}]: {e}");
176                self.metrics.lock().unwrap().decode_errors += 1;
177            }
178        }
179    }
180}