use crate::config::workers::OutputSinkConfig;
use crate::snapshots::MarketSnapshot;
use crate::sources::ExchangeEvent;
use crate::workers::topic_publisher::{TopicMessage, TopicRegistry};
pub trait OutputSink: Send + Sync {
fn emit_raw(
&self,
topic: &str,
event: &ExchangeEvent,
received_at_ns: u64,
) -> anyhow::Result<()> {
let _ = (topic, event, received_at_ns);
Ok(())
}
fn emit_snapshot(&self, snapshot: &MarketSnapshot) -> anyhow::Result<()> {
let _ = snapshot;
Ok(())
}
fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
fn name(&self) -> &'static str;
}
pub struct ChannelSink {
registry: TopicRegistry,
}
impl ChannelSink {
pub fn new(registry: TopicRegistry) -> Self {
Self { registry }
}
pub fn registry(&self) -> &TopicRegistry {
&self.registry
}
}
impl OutputSink for ChannelSink {
fn emit_raw(
&self,
topic: &str,
event: &ExchangeEvent,
received_at_ns: u64,
) -> anyhow::Result<()> {
let msg = TopicMessage {
topic: topic.to_string(),
received_at_ns,
exchange: match event {
ExchangeEvent::Bybit(_) => "bybit".to_string(),
ExchangeEvent::Coinbase(_) => "coinbase".to_string(),
ExchangeEvent::Kraken(_) => "kraken".to_string(),
ExchangeEvent::Binance(_) => "binance".to_string(),
},
payload: event.clone(),
};
if let Err(e) = self.registry.publish(topic, msg) {
tracing::warn!(topic = topic, error = %e, "channel_sink.publish_failed");
}
Ok(())
}
fn emit_snapshot(&self, _snapshot: &MarketSnapshot) -> anyhow::Result<()> {
Ok(())
}
fn name(&self) -> &'static str {
"channel"
}
}
pub struct TerminalSink;
impl OutputSink for TerminalSink {
fn emit_raw(
&self,
topic: &str,
_event: &ExchangeEvent,
received_at_ns: u64,
) -> anyhow::Result<()> {
tracing::debug!(
topic = topic,
received_at_ns = received_at_ns,
"terminal_sink.raw_event"
);
Ok(())
}
fn emit_snapshot(&self, snapshot: &MarketSnapshot) -> anyhow::Result<()> {
tracing::debug!(
ts_ns = snapshot.ts_ns,
has_ob = snapshot.orderbook.is_some(),
n_trades = snapshot.trades.len(),
"terminal_sink.snapshot"
);
Ok(())
}
fn name(&self) -> &'static str {
"terminal"
}
}
#[cfg(feature = "parquet")]
pub struct ParquetSink {
output_dir: String,
snapshot_buffer: std::sync::Mutex<Vec<MarketSnapshot>>,
}
#[cfg(feature = "parquet")]
impl ParquetSink {
pub fn new(output_dir: String) -> Self {
Self {
output_dir,
snapshot_buffer: std::sync::Mutex::new(Vec::new()),
}
}
}
#[cfg(feature = "parquet")]
impl OutputSink for ParquetSink {
fn emit_raw(
&self,
topic: &str,
_event: &ExchangeEvent,
_received_at_ns: u64,
) -> anyhow::Result<()> {
tracing::trace!(topic = topic, "parquet_sink.emit_raw (raw events not persisted)");
Ok(())
}
fn emit_snapshot(&self, snapshot: &MarketSnapshot) -> anyhow::Result<()> {
let mut buf = self.snapshot_buffer.lock().unwrap();
buf.push(snapshot.clone());
Ok(())
}
fn flush(&self) -> anyhow::Result<()> {
use crate::funding::io::funding_parquet::write_funding_parquet_timestamped;
use crate::liquidations::io::liq_parquet::write_liquidations_parquet_timestamped;
use crate::open_interest::io::oi_parquet::write_oi_parquet_timestamped;
use crate::orderbooks::io::ob_parquet::write_ob_parquet;
use crate::trades::io::trades_parquet::write_trades_parquet_timestamped;
let mut buf = self.snapshot_buffer.lock().unwrap();
if buf.is_empty() {
return Ok(());
}
let mut orderbooks = Vec::new();
let mut trades = Vec::new();
let mut liquidations = Vec::new();
let mut funding_rates = Vec::new();
let mut open_interests = Vec::new();
for snap in buf.iter() {
if let Some(ob) = &snap.orderbook {
orderbooks.push(ob.clone());
}
trades.extend(snap.trades.iter().cloned());
liquidations.extend(snap.liquidations.iter().cloned());
funding_rates.extend(snap.funding_rate.iter().cloned());
open_interests.extend(snap.open_interest.iter().cloned());
}
let output_path = std::path::Path::new(&self.output_dir);
if !orderbooks.is_empty() {
let dir = output_path.join("orderbooks");
std::fs::create_dir_all(&dir)?;
let p = write_ob_parquet(&orderbooks, &dir, "sync")?;
tracing::info!(path = %p.display(), n = orderbooks.len(), "parquet_sink.wrote_orderbooks");
}
if !trades.is_empty() {
let dir = output_path.join("trades");
std::fs::create_dir_all(&dir)?;
let p = write_trades_parquet_timestamped(&trades, &dir, "sync")?;
tracing::info!(path = %p.display(), n = trades.len(), "parquet_sink.wrote_trades");
}
if !liquidations.is_empty() {
let dir = output_path.join("liquidations");
std::fs::create_dir_all(&dir)?;
let p = write_liquidations_parquet_timestamped(&liquidations, &dir, "sync")?;
tracing::info!(path = %p.display(), n = liquidations.len(), "parquet_sink.wrote_liquidations");
}
if !funding_rates.is_empty() {
let dir = output_path.join("fundings");
std::fs::create_dir_all(&dir)?;
let p = write_funding_parquet_timestamped(&funding_rates, &dir, "sync")?;
tracing::info!(path = %p.display(), n = funding_rates.len(), "parquet_sink.wrote_funding");
}
if !open_interests.is_empty() {
let dir = output_path.join("open_interests");
std::fs::create_dir_all(&dir)?;
let p = write_oi_parquet_timestamped(&open_interests, &dir, "sync")?;
tracing::info!(path = %p.display(), n = open_interests.len(), "parquet_sink.wrote_oi");
}
let n = buf.len();
buf.clear();
tracing::info!(
dir = self.output_dir.as_str(),
snapshot_count = n,
"parquet_sink.flushed"
);
Ok(())
}
fn name(&self) -> &'static str {
"parquet"
}
}
#[cfg(not(feature = "parquet"))]
pub struct ParquetSink {
#[allow(dead_code)]
output_dir: String,
}
#[cfg(not(feature = "parquet"))]
impl ParquetSink {
pub fn new(output_dir: String) -> Self {
Self { output_dir }
}
}
#[cfg(not(feature = "parquet"))]
impl OutputSink for ParquetSink {
fn emit_raw(
&self,
topic: &str,
_event: &ExchangeEvent,
_received_at_ns: u64,
) -> anyhow::Result<()> {
tracing::trace!(topic = topic, "parquet_sink.raw_event (parquet feature not enabled)");
Ok(())
}
fn emit_snapshot(&self, _snapshot: &MarketSnapshot) -> anyhow::Result<()> {
tracing::trace!("parquet_sink.snapshot (parquet feature not enabled)");
Ok(())
}
fn name(&self) -> &'static str {
"parquet"
}
}
pub struct OutputSinkSet {
sinks: Vec<Box<dyn OutputSink>>,
}
impl OutputSinkSet {
pub fn new() -> Self {
Self { sinks: Vec::new() }
}
pub fn push(&mut self, sink: Box<dyn OutputSink>) {
self.sinks.push(sink);
}
pub fn len(&self) -> usize {
self.sinks.len()
}
pub fn is_empty(&self) -> bool {
self.sinks.is_empty()
}
pub fn emit_raw(
&self,
topic: &str,
event: &ExchangeEvent,
received_at_ns: u64,
) -> anyhow::Result<()> {
for sink in &self.sinks {
if let Err(e) = sink.emit_raw(topic, event, received_at_ns) {
tracing::warn!(
sink = sink.name(),
topic = topic,
error = %e,
"output_sink_set.emit_raw_failed"
);
}
}
Ok(())
}
pub fn emit_snapshot(&self, snapshot: &MarketSnapshot) -> anyhow::Result<()> {
for sink in &self.sinks {
if let Err(e) = sink.emit_snapshot(snapshot) {
tracing::warn!(
sink = sink.name(),
error = %e,
"output_sink_set.emit_snapshot_failed"
);
}
}
Ok(())
}
pub fn flush(&self) -> anyhow::Result<()> {
for sink in &self.sinks {
if let Err(e) = sink.flush() {
tracing::warn!(
sink = sink.name(),
error = %e,
"output_sink_set.flush_failed"
);
}
}
Ok(())
}
}
impl Default for OutputSinkSet {
fn default() -> Self {
Self::new()
}
}
pub fn build_sinks(
configs: &[OutputSinkConfig],
registry: Option<TopicRegistry>,
) -> OutputSinkSet {
let mut set = OutputSinkSet::new();
let mut registry = registry;
for config in configs {
match config {
OutputSinkConfig::Channel => {
if let Some(reg) = registry.take() {
tracing::info!("output.channel_sink_created");
set.push(Box::new(ChannelSink::new(reg)));
} else {
tracing::warn!("output.channel_sink_requested_but_no_registry");
}
}
OutputSinkConfig::Terminal => {
tracing::info!("output.terminal_sink_created");
set.push(Box::new(TerminalSink));
}
OutputSinkConfig::Parquet { dir } => {
tracing::info!(dir = dir.as_str(), "output.parquet_sink_created");
set.push(Box::new(ParquetSink::new(dir.clone())));
}
}
}
set
}