chaincodec_stream/
engine.rs1use crate::config::StreamConfig;
4use crate::listener::BlockListener;
5use chaincodec_core::{
6 decoder::ChainDecoder,
7 error::StreamError,
8 event::DecodedEvent,
9 schema::SchemaRegistry,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::broadcast;
14use tracing::{error, info, warn};
15
16#[derive(Debug, Clone, Default)]
18pub struct StreamMetrics {
19 pub events_decoded: u64,
20 pub events_skipped: u64,
21 pub decode_errors: u64,
22 pub reconnections: u64,
23}
24
25pub struct StreamEngine {
35 config: StreamConfig,
36 listeners: HashMap<String, Arc<dyn BlockListener>>,
37 registry: Arc<dyn SchemaRegistry>,
38 decoders: HashMap<String, Arc<dyn ChainDecoder>>,
39 tx: broadcast::Sender<DecodedEvent>,
40 metrics: Arc<std::sync::Mutex<StreamMetrics>>,
41}
42
43impl StreamEngine {
44 pub fn new(
46 config: StreamConfig,
47 registry: Arc<dyn SchemaRegistry>,
48 ) -> (Self, broadcast::Receiver<DecodedEvent>) {
49 let (tx, rx) = broadcast::channel(config.channel_capacity);
50 let engine = Self {
51 config,
52 listeners: HashMap::new(),
53 registry,
54 decoders: HashMap::new(),
55 tx,
56 metrics: Arc::new(std::sync::Mutex::new(StreamMetrics::default())),
57 };
58 (engine, rx)
59 }
60
61 pub fn add_listener(&mut self, listener: Arc<dyn BlockListener>) {
63 self.listeners
64 .insert(listener.chain_slug().to_string(), listener);
65 }
66
67 pub fn add_decoder(&mut self, chain_slug: impl Into<String>, decoder: Arc<dyn ChainDecoder>) {
69 self.decoders.insert(chain_slug.into(), decoder);
70 }
71
72 pub fn subscribe(&self) -> broadcast::Receiver<DecodedEvent> {
75 self.tx.subscribe()
76 }
77
78 pub fn metrics(&self) -> StreamMetrics {
80 self.metrics.lock().unwrap().clone()
81 }
82
83 pub async fn run(self: Arc<Self>) {
86 info!("StreamEngine starting with {} chains", self.listeners.len());
87
88 for (chain_slug, listener) in &self.listeners {
89 let chain_slug = chain_slug.clone();
90 let listener = Arc::clone(listener);
91 let engine = Arc::clone(&self);
92
93 tokio::spawn(async move {
94 engine.run_listener(chain_slug, listener).await;
95 });
96 }
97 }
98
99 async fn run_listener(
100 &self,
101 chain_slug: String,
102 listener: Arc<dyn BlockListener>,
103 ) {
104 use futures::StreamExt;
105
106 let mut retry = 0u32;
107 loop {
108 info!("Connecting listener for chain: {}", chain_slug);
109 match listener.subscribe().await {
110 Err(e) => {
111 error!("Listener connect error [{chain_slug}]: {e}");
112 retry += 1;
113 self.metrics.lock().unwrap().reconnections += 1;
114 tokio::time::sleep(std::time::Duration::from_millis(500 * 2u64.pow(retry.min(6)))).await;
115 continue;
116 }
117 Ok(mut stream) => {
118 retry = 0;
119 while let Some(item) = stream.next().await {
120 match item {
121 Err(e) => {
122 warn!("Stream error [{chain_slug}]: {e}");
123 break; }
125 Ok(raw) => {
126 self.process_raw_event(raw).await;
127 }
128 }
129 }
130 info!("Stream closed for [{chain_slug}], reconnecting...");
131 self.metrics.lock().unwrap().reconnections += 1;
132 }
133 }
134 }
135 }
136
137 async fn process_raw_event(&self, raw: chaincodec_core::event::RawEvent) {
138 let chain_slug = raw.chain.slug.clone();
139 let decoder = match self.decoders.get(&chain_slug) {
140 Some(d) => d,
141 None => {
142 warn!("No decoder registered for chain: {}", chain_slug);
143 self.metrics.lock().unwrap().events_skipped += 1;
144 return;
145 }
146 };
147
148 let fp = decoder.fingerprint(&raw);
149 let schema = match self.registry.get_by_fingerprint(&fp) {
150 Some(s) => s,
151 None => {
152 if !self.config.skip_unknown {
153 warn!("Unknown schema for fingerprint: {}", fp);
154 }
155 self.metrics.lock().unwrap().events_skipped += 1;
156 return;
157 }
158 };
159
160 if !self.config.schemas.is_empty() && !self.config.schemas.contains(&schema.name) {
162 self.metrics.lock().unwrap().events_skipped += 1;
163 return;
164 }
165
166 match decoder.decode_event(&raw, &schema) {
167 Ok(event) => {
168 self.metrics.lock().unwrap().events_decoded += 1;
169 if let Err(e) = self.tx.send(event) {
170 warn!("No active subscribers: {e}");
172 }
173 }
174 Err(e) => {
175 error!("Decode error [{chain_slug}]: {e}");
176 self.metrics.lock().unwrap().decode_errors += 1;
177 }
178 }
179 }
180}