mod establish_connection;
mod monitor_connection;
use establish_connection::connect;
use monitor_connection::run_stream;
use crate::config::Config;
use chainlink_data_streams_report::feed_id::ID;
use chainlink_data_streams_report::report::Report;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::{
net::TcpStream,
sync::{broadcast, mpsc, Mutex},
time::{sleep, Duration},
};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream as TungsteniteWebSocketStream};
use tracing::{debug, error, info};
pub const DEFAULT_WS_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
pub const MIN_WS_RECONNECT_INTERVAL: Duration = Duration::from_millis(1000);
pub const MAX_WS_RECONNECT_INTERVAL: Duration = Duration::from_millis(10000);
#[derive(Debug, thiserror::Error)]
pub enum StreamError {
#[error("WebSocket error: {0}")]
WebSocketError(#[from] tokio_tungstenite::tungstenite::Error),
#[error("Connection error: {0}")]
ConnectionError(String),
#[error("Authentication error: {0}")]
AuthError(#[from] crate::auth::HmacError),
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
#[error("Stream closed")]
StreamClosed,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WebSocketReport {
pub report: Report,
}
struct Stats {
accepted: AtomicUsize,
deduplicated: AtomicUsize,
partial_reconnects: AtomicUsize,
full_reconnects: AtomicUsize,
configured_connections: AtomicUsize,
active_connections: AtomicUsize,
}
#[derive(Debug)]
pub enum WebSocketConnection {
Single(TungsteniteWebSocketStream<MaybeTlsStream<TcpStream>>),
Multiple(Vec<TungsteniteWebSocketStream<MaybeTlsStream<TcpStream>>>),
}
pub struct Stream {
config: Config,
feed_ids: Vec<ID>,
conn: Option<WebSocketConnection>,
report_sender: mpsc::Sender<WebSocketReport>,
report_receiver: mpsc::Receiver<WebSocketReport>,
shutdown_sender: broadcast::Sender<()>,
stats: Arc<Stats>,
water_mark: Arc<Mutex<HashMap<String, usize>>>,
}
impl Stream {
pub async fn new(config: &Config, feed_ids: Vec<ID>) -> Result<Stream, StreamError> {
let (report_sender, report_receiver) = mpsc::channel(100);
let (shutdown_sender, _) = broadcast::channel(1);
let stats = Arc::new(Stats {
accepted: AtomicUsize::new(0),
deduplicated: AtomicUsize::new(0),
partial_reconnects: AtomicUsize::new(0),
full_reconnects: AtomicUsize::new(0),
configured_connections: AtomicUsize::new(0),
active_connections: AtomicUsize::new(0),
});
let conn = connect(config, &feed_ids, stats.clone()).await?;
let water_mark = Arc::new(Mutex::new(HashMap::new()));
Ok(Stream {
config: config.clone(),
feed_ids,
conn: Some(conn),
report_sender,
report_receiver,
shutdown_sender,
stats,
water_mark,
})
}
pub async fn listen(&mut self) -> Result<(), StreamError> {
let conn = self
.conn
.take()
.ok_or_else(|| StreamError::ConnectionError("No connection".into()))?;
match conn {
WebSocketConnection::Single(stream) => {
let report_sender = self.report_sender.clone();
let shutdown_receiver = self.shutdown_sender.subscribe();
let stats = self.stats.clone();
let water_mark = self.water_mark.clone();
let config = self.config.clone();
let feed_ids = self.feed_ids.clone();
tokio::spawn(run_stream(
stream,
report_sender,
shutdown_receiver,
stats,
water_mark,
config,
feed_ids,
));
}
WebSocketConnection::Multiple(streams) => {
for stream in streams {
let report_sender = self.report_sender.clone();
let shutdown_receiver = self.shutdown_sender.subscribe();
let stats = self.stats.clone();
let water_mark = self.water_mark.clone();
let config = self.config.clone();
let feed_ids = self.feed_ids.clone();
tokio::spawn(run_stream(
stream,
report_sender,
shutdown_receiver,
stats,
water_mark,
config,
feed_ids,
));
}
}
}
Ok(())
}
pub async fn read(&mut self) -> Result<WebSocketReport, StreamError> {
self.report_receiver
.recv()
.await
.ok_or(StreamError::StreamClosed)
}
pub async fn close(&mut self) -> Result<(), StreamError> {
info!("Closing stream...");
if let Err(e) = self.shutdown_sender.send(()) {
debug!("Shutdown signal not sent (no active receivers). Stream may already be closed. Error received: {:?}", e);
}
sleep(Duration::from_millis(100)).await;
Ok(())
}
pub fn get_stats(&self) -> StatsSnapshot {
let accepted = self.stats.accepted.load(Ordering::SeqCst);
let deduplicated = self.stats.deduplicated.load(Ordering::SeqCst);
StatsSnapshot {
accepted,
deduplicated,
total_received: accepted + deduplicated,
partial_reconnects: self.stats.partial_reconnects.load(Ordering::SeqCst),
full_reconnects: self.stats.full_reconnects.load(Ordering::SeqCst),
configured_connections: self.stats.configured_connections.load(Ordering::SeqCst),
active_connections: self.stats.active_connections.load(Ordering::SeqCst),
}
}
}
#[derive(Debug, Clone)]
pub struct StatsSnapshot {
pub accepted: usize,
pub deduplicated: usize,
pub total_received: usize,
pub partial_reconnects: usize,
pub full_reconnects: usize,
pub configured_connections: usize,
pub active_connections: usize,
}