Skip to main content

tradingview/loader/
mod.rs

1//! Generic event-driven data loader.
2//!
3//! `DataLoader` is the central orchestrator that connects a [`DataSource`](crate::source::DataSource) to
4//! one or more [`EventSink`](crate::sink::EventSink)s. It handles:
5//!
6//! - Source → fan-out task → per-sink tasks pipeline
7//! - Bounded channels for backpressure
8//! - Graceful shutdown via `CancellationToken`
9//! - Error propagation
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use tradingview::loader::DataLoader;
15//! use tradingview::sink::{ChannelSink, CallbackSink};
16//! use tradingview::source::tradingview::TradingViewSource;
17//!
18//! # async fn example() -> tradingview::Result<()> {
19//! let (channel_sink, mut rx) = ChannelSink::new(1024);
20//!
21//! let mut loader = DataLoader::builder()
22//!     .source(TradingViewSource::new())
23//!     .sink(channel_sink)
24//!     .sink(CallbackSink::new("debug", |events| async move {
25//!         for event in &events {
26//!             tracing::debug!("{:?}", event);
27//!         }
28//!         Ok(())
29//!     }))
30//!     .build()?;
31//!
32//! loader.start().await?;
33//! // ... events flow ...
34//! loader.shutdown().await?;
35//! # Ok(())
36//! # }
37//! ```
38
39use tokio::{sync::mpsc, task::JoinSet};
40use tokio_util::sync::CancellationToken;
41use tracing::{debug, error, info, warn};
42
43use crate::{Result, events::MarketEvent, sink::EventSink, source::DataSource};
44
45// ---------------------------------------------------------------------------
46// Type aliases
47// ---------------------------------------------------------------------------
48
49/// A channel pair for dispatching event batches to a sink.
50type EventChannel = (
51    mpsc::Sender<Vec<MarketEvent>>,
52    mpsc::Receiver<Vec<MarketEvent>>,
53);
54
55// ---------------------------------------------------------------------------
56// Configuration
57// ---------------------------------------------------------------------------
58
59/// Configuration for the data loader.
60#[derive(Debug, Clone)]
61pub struct LoaderConfig {
62    /// Size of the bounded channel between source and fan-out task.
63    /// Default: 4096.
64    pub channel_capacity: usize,
65    /// Maximum events per batch (for future batching logic).
66    /// Default: 256.
67    pub batch_size: usize,
68    /// If `true`, the loader continues even if a sink rejects events.
69    /// Default: `false`.
70    pub continue_on_sink_error: bool,
71}
72
73impl Default for LoaderConfig {
74    fn default() -> Self {
75        Self {
76            channel_capacity: 4096,
77            batch_size: 256,
78            continue_on_sink_error: false,
79        }
80    }
81}
82
83// ---------------------------------------------------------------------------
84// DataLoader
85// ---------------------------------------------------------------------------
86
87/// Event-driven data loader — connects a source to multiple sinks.
88///
89/// Architecture:
90///
91/// ```text
92/// Source task → source_tx ──→ [fan-out task] ──→ sink_tx_0 → sink task 0
93///                                        ├───────→ sink_tx_1 → sink task 1
94///                                        └───────→ sink_tx_N → sink task N
95/// ```
96///
97/// The source produces `Vec<MarketEvent>` batches and sends them via a single
98/// `mpsc` channel. A fan-out task reads from that channel and clones each
99/// batch to every sink's individual channel. Each sink has its own task that
100/// reads from its channel and calls `sink.accept()`.
101pub struct DataLoader<S: DataSource = crate::source::tradingview::TradingViewSource> {
102    source: Option<S>,
103    sinks: Vec<Box<dyn EventSink>>,
104    /// Per-sink (tx, rx) channel pairs.
105    sink_channels: Vec<EventChannel>,
106    config: LoaderConfig,
107    cancel: CancellationToken,
108    tasks: JoinSet<Result<()>>,
109}
110
111impl<S: DataSource> DataLoader<S> {
112    /// Create a new builder with default configuration.
113    pub fn builder() -> DataLoaderBuilder<S> {
114        DataLoaderBuilder::new()
115    }
116
117    /// Start the loader: spawns source, fan-out, and per-sink tasks.
118    ///
119    /// This method consumes the source and sink channels; calling it twice
120    /// returns an error.
121    pub async fn start(&mut self) -> Result<()> {
122        let source = self
123            .source
124            .take()
125            .ok_or_else(|| crate::Error::Internal(ustr::ustr("loader already started")))?;
126
127        if self.sinks.is_empty() {
128            return Err(crate::Error::Internal(ustr::ustr(
129                "no sinks registered — add at least one sink before starting",
130            )));
131        }
132
133        let sink_count = self.sinks.len();
134        info!(
135            source = %source.name(),
136            sink_count,
137            channel_capacity = self.config.channel_capacity,
138            "starting data loader",
139        );
140
141        // ---- Source → fan-out channel ----
142        let (source_tx, source_rx) =
143            mpsc::channel::<Vec<MarketEvent>>(self.config.channel_capacity);
144
145        // ---- Spawn source task ----
146        let cancel_src = self.cancel.clone();
147        let source_name = source.name().to_string();
148        self.tasks.spawn(async move {
149            debug!(source = %source_name, "source task started");
150            source.run(source_tx, cancel_src).await.inspect_err(|e| {
151                error!(source = %source_name, error = %e, "source failed");
152            })
153        });
154
155        // ---- Fan-out task ----
156        let sink_txs: Vec<mpsc::Sender<Vec<MarketEvent>>> = self
157            .sink_channels
158            .iter()
159            .map(|(tx, _)| tx.clone())
160            .collect();
161        let fan_cancel = self.cancel.clone();
162        let fan_config = self.config.clone();
163        self.tasks
164            .spawn(async move { fan_out_task(source_rx, sink_txs, fan_config, fan_cancel).await });
165
166        // ---- Per-sink tasks ----
167        let sink_rxs: Vec<(usize, mpsc::Receiver<Vec<MarketEvent>>)> = self
168            .sink_channels
169            .drain(..)
170            .enumerate()
171            .map(|(i, (_, rx))| (i, rx))
172            .collect();
173
174        let mut sinks = std::mem::take(&mut self.sinks);
175
176        for (idx, mut rx) in sink_rxs {
177            // SAFETY: each sink is unique; we remove them in order.
178            let sink = sinks.remove(0);
179            let cancel_s = self.cancel.clone();
180            let sink_name = sink.name().to_string();
181            let cont_on_err = self.config.continue_on_sink_error;
182
183            self.tasks.spawn(async move {
184                sink_task(idx, sink, &mut rx, sink_name, cont_on_err, cancel_s).await
185            });
186        }
187
188        Ok(())
189    }
190
191    /// Initiate graceful shutdown.
192    ///
193    /// Cancels all spawned tasks and waits for them to finish.
194    pub async fn shutdown(&mut self) -> Result<()> {
195        info!("initiating loader shutdown");
196        self.cancel.cancel();
197
198        while let Some(result) = self.tasks.join_next().await {
199            match result {
200                Ok(Ok(())) => debug!("task completed successfully"),
201                Ok(Err(e)) => warn!(error = %e, "task completed with error"),
202                Err(e) => warn!(error = %e, "task join error"),
203            }
204        }
205
206        info!("loader shutdown complete");
207        Ok(())
208    }
209
210    /// Returns a clone of the cancellation token for external monitoring.
211    pub fn cancel_token(&self) -> CancellationToken {
212        self.cancel.clone()
213    }
214}
215
216// ---------------------------------------------------------------------------
217// Background tasks
218// ---------------------------------------------------------------------------
219
220/// Fan-out: reads from `source_rx` and sends clones to every `sink_tx`.
221async fn fan_out_task(
222    mut source_rx: mpsc::Receiver<Vec<MarketEvent>>,
223    sink_txs: Vec<mpsc::Sender<Vec<MarketEvent>>>,
224    config: LoaderConfig,
225    cancel: CancellationToken,
226) -> Result<()> {
227    loop {
228        tokio::select! {
229            biased;
230
231            _ = cancel.cancelled() => {
232                debug!("fan-out task cancelled");
233                break;
234            }
235
236            result = source_rx.recv() => {
237                match result {
238                    Some(events) => {
239                        for (i, tx) in sink_txs.iter().enumerate() {
240                            if let Err(e) = tx.send(events.clone()).await {
241                                if config.continue_on_sink_error {
242                                    warn!(sink_index = i, error = %e, "sink channel dropped");
243                                } else {
244                                    return Err(crate::Error::Internal(ustr::ustr(
245                                        &format!("sink channel {i} closed: {e}")
246                                    )));
247                                }
248                            }
249                        }
250                    }
251                    None => {
252                        info!("source channel closed — fan-out exiting");
253                        break;
254                    }
255                }
256            }
257        }
258    }
259
260    Ok(())
261}
262
263/// Single sink task: reads from `rx` and calls `sink.accept()`.
264async fn sink_task(
265    idx: usize,
266    sink: Box<dyn EventSink>,
267    rx: &mut mpsc::Receiver<Vec<MarketEvent>>,
268    name: String,
269    continue_on_error: bool,
270    cancel: CancellationToken,
271) -> Result<()> {
272    debug!(sink_index = idx, sink = %name, "sink task started");
273
274    loop {
275        tokio::select! {
276            biased;
277
278            _ = cancel.cancelled() => {
279                debug!(sink = %name, "sink task cancelled");
280                // Give the sink a chance to flush
281                let _ = sink.shutdown(cancel).await;
282                break;
283            }
284
285            result = rx.recv() => {
286                match result {
287                    Some(events) => {
288                        if let Err(e) = sink.accept(&events).await {
289                            warn!(sink = %name, error = %e, "sink accept failed");
290                            if !continue_on_error {
291                                return Err(e);
292                            }
293                        }
294                    }
295                    None => {
296                        debug!(sink = %name, "sink channel closed");
297                        break;
298                    }
299                }
300            }
301        }
302    }
303
304    debug!(sink = %name, "sink task exiting");
305    Ok(())
306}
307
308// ---------------------------------------------------------------------------
309// Builder
310// ---------------------------------------------------------------------------
311
312/// Builder for [`DataLoader`].
313///
314/// # Example
315///
316/// ```rust,ignore
317/// let loader = DataLoader::<TradingViewSource>::builder()
318///     .source(tv_source)
319///     .sink(channel_sink)
320///     .channel_capacity(8192)
321///     .build()?;
322/// ```
323#[must_use]
324pub struct DataLoaderBuilder<S: DataSource> {
325    source: Option<S>,
326    sinks: Vec<Box<dyn EventSink>>,
327    sink_channels: Vec<EventChannel>,
328    config: LoaderConfig,
329}
330
331impl<S: DataSource> DataLoaderBuilder<S> {
332    /// Create a new builder with defaults.
333    pub fn new() -> Self {
334        Self {
335            source: None,
336            sinks: Vec::new(),
337            sink_channels: Vec::new(),
338            config: LoaderConfig::default(),
339        }
340    }
341
342    /// Set the data source (required).
343    pub fn source(mut self, source: S) -> Self {
344        self.source = Some(source);
345        self
346    }
347
348    /// Register an event sink. You can call this multiple times to fan out
349    /// to several sinks.
350    pub fn sink(mut self, sink: impl EventSink) -> Self {
351        let (tx, rx) = mpsc::channel::<Vec<MarketEvent>>(self.config.channel_capacity);
352        self.sink_channels.push((tx, rx));
353        let boxed: Box<dyn EventSink> = Box::new(sink);
354        self.sinks.push(boxed);
355        self
356    }
357
358    /// Override the default channel capacity (4096).
359    pub fn channel_capacity(mut self, capacity: usize) -> Self {
360        self.config.channel_capacity = capacity;
361        self
362    }
363
364    /// If `true`, the loader keeps running even when a sink rejects events.
365    pub fn continue_on_sink_error(mut self, val: bool) -> Self {
366        self.config.continue_on_sink_error = val;
367        self
368    }
369
370    /// Finish building the [`DataLoader`].
371    ///
372    /// # Errors
373    ///
374    /// - No source configured.
375    /// - No sinks registered.
376    pub fn build(self) -> Result<DataLoader<S>> {
377        let source = self
378            .source
379            .ok_or_else(|| crate::Error::Internal(ustr::ustr("no data source configured")))?;
380
381        if self.sinks.is_empty() {
382            return Err(crate::Error::Internal(ustr::ustr(
383                "at least one sink is required",
384            )));
385        }
386
387        Ok(DataLoader {
388            source: Some(source),
389            sinks: self.sinks,
390            sink_channels: self.sink_channels,
391            config: self.config,
392            cancel: CancellationToken::new(),
393            tasks: JoinSet::new(),
394        })
395    }
396}
397
398impl<S: DataSource> Default for DataLoaderBuilder<S> {
399    fn default() -> Self {
400        Self::new()
401    }
402}