use tokio::{sync::mpsc, task::JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::{Result, events::MarketEvent, sink::EventSink, source::DataSource};
type EventChannel = (
mpsc::Sender<Vec<MarketEvent>>,
mpsc::Receiver<Vec<MarketEvent>>,
);
#[derive(Debug, Clone)]
pub struct LoaderConfig {
pub channel_capacity: usize,
pub batch_size: usize,
pub continue_on_sink_error: bool,
}
impl Default for LoaderConfig {
fn default() -> Self {
Self {
channel_capacity: 4096,
batch_size: 256,
continue_on_sink_error: false,
}
}
}
pub struct DataLoader<S: DataSource = crate::source::tradingview::TradingViewSource> {
source: Option<S>,
sinks: Vec<Box<dyn EventSink>>,
sink_channels: Vec<EventChannel>,
config: LoaderConfig,
cancel: CancellationToken,
tasks: JoinSet<Result<()>>,
}
impl<S: DataSource> DataLoader<S> {
pub fn builder() -> DataLoaderBuilder<S> {
DataLoaderBuilder::new()
}
pub async fn start(&mut self) -> Result<()> {
let source = self
.source
.take()
.ok_or_else(|| crate::Error::Internal(ustr::ustr("loader already started")))?;
if self.sinks.is_empty() {
return Err(crate::Error::Internal(ustr::ustr(
"no sinks registered — add at least one sink before starting",
)));
}
let sink_count = self.sinks.len();
info!(
source = %source.name(),
sink_count,
channel_capacity = self.config.channel_capacity,
"starting data loader",
);
let (source_tx, source_rx) =
mpsc::channel::<Vec<MarketEvent>>(self.config.channel_capacity);
let cancel_src = self.cancel.clone();
let source_name = source.name().to_string();
self.tasks.spawn(async move {
debug!(source = %source_name, "source task started");
source.run(source_tx, cancel_src).await.inspect_err(|e| {
error!(source = %source_name, error = %e, "source failed");
})
});
let sink_txs: Vec<mpsc::Sender<Vec<MarketEvent>>> = self
.sink_channels
.iter()
.map(|(tx, _)| tx.clone())
.collect();
let fan_cancel = self.cancel.clone();
let fan_config = self.config.clone();
self.tasks
.spawn(async move { fan_out_task(source_rx, sink_txs, fan_config, fan_cancel).await });
let sink_rxs: Vec<(usize, mpsc::Receiver<Vec<MarketEvent>>)> = self
.sink_channels
.drain(..)
.enumerate()
.map(|(i, (_, rx))| (i, rx))
.collect();
let mut sinks = std::mem::take(&mut self.sinks);
for (idx, mut rx) in sink_rxs {
let sink = sinks.remove(0);
let cancel_s = self.cancel.clone();
let sink_name = sink.name().to_string();
let cont_on_err = self.config.continue_on_sink_error;
self.tasks.spawn(async move {
sink_task(idx, sink, &mut rx, sink_name, cont_on_err, cancel_s).await
});
}
Ok(())
}
pub async fn shutdown(&mut self) -> Result<()> {
info!("initiating loader shutdown");
self.cancel.cancel();
while let Some(result) = self.tasks.join_next().await {
match result {
Ok(Ok(())) => debug!("task completed successfully"),
Ok(Err(e)) => warn!(error = %e, "task completed with error"),
Err(e) => warn!(error = %e, "task join error"),
}
}
info!("loader shutdown complete");
Ok(())
}
pub fn cancel_token(&self) -> CancellationToken {
self.cancel.clone()
}
}
async fn fan_out_task(
mut source_rx: mpsc::Receiver<Vec<MarketEvent>>,
sink_txs: Vec<mpsc::Sender<Vec<MarketEvent>>>,
config: LoaderConfig,
cancel: CancellationToken,
) -> Result<()> {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
debug!("fan-out task cancelled");
break;
}
result = source_rx.recv() => {
match result {
Some(events) => {
for (i, tx) in sink_txs.iter().enumerate() {
if let Err(e) = tx.send(events.clone()).await {
if config.continue_on_sink_error {
warn!(sink_index = i, error = %e, "sink channel dropped");
} else {
return Err(crate::Error::Internal(ustr::ustr(
&format!("sink channel {i} closed: {e}")
)));
}
}
}
}
None => {
info!("source channel closed — fan-out exiting");
break;
}
}
}
}
}
Ok(())
}
async fn sink_task(
idx: usize,
sink: Box<dyn EventSink>,
rx: &mut mpsc::Receiver<Vec<MarketEvent>>,
name: String,
continue_on_error: bool,
cancel: CancellationToken,
) -> Result<()> {
debug!(sink_index = idx, sink = %name, "sink task started");
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
debug!(sink = %name, "sink task cancelled");
let _ = sink.shutdown(cancel).await;
break;
}
result = rx.recv() => {
match result {
Some(events) => {
if let Err(e) = sink.accept(&events).await {
warn!(sink = %name, error = %e, "sink accept failed");
if !continue_on_error {
return Err(e);
}
}
}
None => {
debug!(sink = %name, "sink channel closed");
break;
}
}
}
}
}
debug!(sink = %name, "sink task exiting");
Ok(())
}
#[must_use]
pub struct DataLoaderBuilder<S: DataSource> {
source: Option<S>,
sinks: Vec<Box<dyn EventSink>>,
sink_channels: Vec<EventChannel>,
config: LoaderConfig,
}
impl<S: DataSource> DataLoaderBuilder<S> {
pub fn new() -> Self {
Self {
source: None,
sinks: Vec::new(),
sink_channels: Vec::new(),
config: LoaderConfig::default(),
}
}
pub fn source(mut self, source: S) -> Self {
self.source = Some(source);
self
}
pub fn sink(mut self, sink: impl EventSink) -> Self {
let (tx, rx) = mpsc::channel::<Vec<MarketEvent>>(self.config.channel_capacity);
self.sink_channels.push((tx, rx));
let boxed: Box<dyn EventSink> = Box::new(sink);
self.sinks.push(boxed);
self
}
pub fn channel_capacity(mut self, capacity: usize) -> Self {
self.config.channel_capacity = capacity;
self
}
pub fn continue_on_sink_error(mut self, val: bool) -> Self {
self.config.continue_on_sink_error = val;
self
}
pub fn build(self) -> Result<DataLoader<S>> {
let source = self
.source
.ok_or_else(|| crate::Error::Internal(ustr::ustr("no data source configured")))?;
if self.sinks.is_empty() {
return Err(crate::Error::Internal(ustr::ustr(
"at least one sink is required",
)));
}
Ok(DataLoader {
source: Some(source),
sinks: self.sinks,
sink_channels: self.sink_channels,
config: self.config,
cancel: CancellationToken::new(),
tasks: JoinSet::new(),
})
}
}
impl<S: DataSource> Default for DataLoaderBuilder<S> {
fn default() -> Self {
Self::new()
}
}