tradingview/loader/mod.rs
1//! Generic event-driven data loader.
2//!
3//! `DataLoader` is the central orchestrator that connects a [`DataSource`](crate::source::DataSource) to
4//! one or more [`EventSink`](crate::sink::EventSink)s. It handles:
5//!
6//! - Source → fan-out task → per-sink tasks pipeline
7//! - Bounded channels for backpressure
8//! - Graceful shutdown via `CancellationToken`
9//! - Error propagation
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use tradingview::loader::DataLoader;
15//! use tradingview::sink::{ChannelSink, CallbackSink};
16//! use tradingview::source::tradingview::TradingViewSource;
17//!
18//! # async fn example() -> tradingview::Result<()> {
19//! let (channel_sink, mut rx) = ChannelSink::new(1024);
20//!
21//! let mut loader = DataLoader::builder()
22//! .source(TradingViewSource::new())
23//! .sink(channel_sink)
24//! .sink(CallbackSink::new("debug", |events| async move {
25//! for event in &events {
26//! tracing::debug!("{:?}", event);
27//! }
28//! Ok(())
29//! }))
30//! .build()?;
31//!
32//! loader.start().await?;
33//! // ... events flow ...
34//! loader.shutdown().await?;
35//! # Ok(())
36//! # }
37//! ```
38
39use tokio::{sync::mpsc, task::JoinSet};
40use tokio_util::sync::CancellationToken;
41use tracing::{debug, error, info, warn};
42
43use crate::{Result, events::MarketEvent, sink::EventSink, source::DataSource};
44
45// ---------------------------------------------------------------------------
46// Type aliases
47// ---------------------------------------------------------------------------
48
49/// A channel pair for dispatching event batches to a sink.
50type EventChannel = (
51 mpsc::Sender<Vec<MarketEvent>>,
52 mpsc::Receiver<Vec<MarketEvent>>,
53);
54
55// ---------------------------------------------------------------------------
56// Configuration
57// ---------------------------------------------------------------------------
58
59/// Configuration for the data loader.
60#[derive(Debug, Clone)]
61pub struct LoaderConfig {
62 /// Size of the bounded channel between source and fan-out task.
63 /// Default: 4096.
64 pub channel_capacity: usize,
65 /// Maximum events per batch (for future batching logic).
66 /// Default: 256.
67 pub batch_size: usize,
68 /// If `true`, the loader continues even if a sink rejects events.
69 /// Default: `false`.
70 pub continue_on_sink_error: bool,
71}
72
73impl Default for LoaderConfig {
74 fn default() -> Self {
75 Self {
76 channel_capacity: 4096,
77 batch_size: 256,
78 continue_on_sink_error: false,
79 }
80 }
81}
82
83// ---------------------------------------------------------------------------
84// DataLoader
85// ---------------------------------------------------------------------------
86
87/// Event-driven data loader — connects a source to multiple sinks.
88///
89/// Architecture:
90///
91/// ```text
92/// Source task → source_tx ──→ [fan-out task] ──→ sink_tx_0 → sink task 0
93/// ├───────→ sink_tx_1 → sink task 1
94/// └───────→ sink_tx_N → sink task N
95/// ```
96///
97/// The source produces `Vec<MarketEvent>` batches and sends them via a single
98/// `mpsc` channel. A fan-out task reads from that channel and clones each
99/// batch to every sink's individual channel. Each sink has its own task that
100/// reads from its channel and calls `sink.accept()`.
101pub struct DataLoader<S: DataSource = crate::source::tradingview::TradingViewSource> {
102 source: Option<S>,
103 sinks: Vec<Box<dyn EventSink>>,
104 /// Per-sink (tx, rx) channel pairs.
105 sink_channels: Vec<EventChannel>,
106 config: LoaderConfig,
107 cancel: CancellationToken,
108 tasks: JoinSet<Result<()>>,
109}
110
111impl<S: DataSource> DataLoader<S> {
112 /// Create a new builder with default configuration.
113 pub fn builder() -> DataLoaderBuilder<S> {
114 DataLoaderBuilder::new()
115 }
116
117 /// Start the loader: spawns source, fan-out, and per-sink tasks.
118 ///
119 /// This method consumes the source and sink channels; calling it twice
120 /// returns an error.
121 pub async fn start(&mut self) -> Result<()> {
122 let source = self
123 .source
124 .take()
125 .ok_or_else(|| crate::Error::Internal(ustr::ustr("loader already started")))?;
126
127 if self.sinks.is_empty() {
128 return Err(crate::Error::Internal(ustr::ustr(
129 "no sinks registered — add at least one sink before starting",
130 )));
131 }
132
133 let sink_count = self.sinks.len();
134 info!(
135 source = %source.name(),
136 sink_count,
137 channel_capacity = self.config.channel_capacity,
138 "starting data loader",
139 );
140
141 // ---- Source → fan-out channel ----
142 let (source_tx, source_rx) =
143 mpsc::channel::<Vec<MarketEvent>>(self.config.channel_capacity);
144
145 // ---- Spawn source task ----
146 let cancel_src = self.cancel.clone();
147 let source_name = source.name().to_string();
148 self.tasks.spawn(async move {
149 debug!(source = %source_name, "source task started");
150 source.run(source_tx, cancel_src).await.inspect_err(|e| {
151 error!(source = %source_name, error = %e, "source failed");
152 })
153 });
154
155 // ---- Fan-out task ----
156 let sink_txs: Vec<mpsc::Sender<Vec<MarketEvent>>> = self
157 .sink_channels
158 .iter()
159 .map(|(tx, _)| tx.clone())
160 .collect();
161 let fan_cancel = self.cancel.clone();
162 let fan_config = self.config.clone();
163 self.tasks
164 .spawn(async move { fan_out_task(source_rx, sink_txs, fan_config, fan_cancel).await });
165
166 // ---- Per-sink tasks ----
167 let sink_rxs: Vec<(usize, mpsc::Receiver<Vec<MarketEvent>>)> = self
168 .sink_channels
169 .drain(..)
170 .enumerate()
171 .map(|(i, (_, rx))| (i, rx))
172 .collect();
173
174 let mut sinks = std::mem::take(&mut self.sinks);
175
176 for (idx, mut rx) in sink_rxs {
177 // SAFETY: each sink is unique; we remove them in order.
178 let sink = sinks.remove(0);
179 let cancel_s = self.cancel.clone();
180 let sink_name = sink.name().to_string();
181 let cont_on_err = self.config.continue_on_sink_error;
182
183 self.tasks.spawn(async move {
184 sink_task(idx, sink, &mut rx, sink_name, cont_on_err, cancel_s).await
185 });
186 }
187
188 Ok(())
189 }
190
191 /// Initiate graceful shutdown.
192 ///
193 /// Cancels all spawned tasks and waits for them to finish.
194 pub async fn shutdown(&mut self) -> Result<()> {
195 info!("initiating loader shutdown");
196 self.cancel.cancel();
197
198 while let Some(result) = self.tasks.join_next().await {
199 match result {
200 Ok(Ok(())) => debug!("task completed successfully"),
201 Ok(Err(e)) => warn!(error = %e, "task completed with error"),
202 Err(e) => warn!(error = %e, "task join error"),
203 }
204 }
205
206 info!("loader shutdown complete");
207 Ok(())
208 }
209
210 /// Returns a clone of the cancellation token for external monitoring.
211 pub fn cancel_token(&self) -> CancellationToken {
212 self.cancel.clone()
213 }
214}
215
216// ---------------------------------------------------------------------------
217// Background tasks
218// ---------------------------------------------------------------------------
219
220/// Fan-out: reads from `source_rx` and sends clones to every `sink_tx`.
221async fn fan_out_task(
222 mut source_rx: mpsc::Receiver<Vec<MarketEvent>>,
223 sink_txs: Vec<mpsc::Sender<Vec<MarketEvent>>>,
224 config: LoaderConfig,
225 cancel: CancellationToken,
226) -> Result<()> {
227 loop {
228 tokio::select! {
229 biased;
230
231 _ = cancel.cancelled() => {
232 debug!("fan-out task cancelled");
233 break;
234 }
235
236 result = source_rx.recv() => {
237 match result {
238 Some(events) => {
239 for (i, tx) in sink_txs.iter().enumerate() {
240 if let Err(e) = tx.send(events.clone()).await {
241 if config.continue_on_sink_error {
242 warn!(sink_index = i, error = %e, "sink channel dropped");
243 } else {
244 return Err(crate::Error::Internal(ustr::ustr(
245 &format!("sink channel {i} closed: {e}")
246 )));
247 }
248 }
249 }
250 }
251 None => {
252 info!("source channel closed — fan-out exiting");
253 break;
254 }
255 }
256 }
257 }
258 }
259
260 Ok(())
261}
262
263/// Single sink task: reads from `rx` and calls `sink.accept()`.
264async fn sink_task(
265 idx: usize,
266 sink: Box<dyn EventSink>,
267 rx: &mut mpsc::Receiver<Vec<MarketEvent>>,
268 name: String,
269 continue_on_error: bool,
270 cancel: CancellationToken,
271) -> Result<()> {
272 debug!(sink_index = idx, sink = %name, "sink task started");
273
274 loop {
275 tokio::select! {
276 biased;
277
278 _ = cancel.cancelled() => {
279 debug!(sink = %name, "sink task cancelled");
280 // Give the sink a chance to flush
281 let _ = sink.shutdown(cancel).await;
282 break;
283 }
284
285 result = rx.recv() => {
286 match result {
287 Some(events) => {
288 if let Err(e) = sink.accept(&events).await {
289 warn!(sink = %name, error = %e, "sink accept failed");
290 if !continue_on_error {
291 return Err(e);
292 }
293 }
294 }
295 None => {
296 debug!(sink = %name, "sink channel closed");
297 break;
298 }
299 }
300 }
301 }
302 }
303
304 debug!(sink = %name, "sink task exiting");
305 Ok(())
306}
307
308// ---------------------------------------------------------------------------
309// Builder
310// ---------------------------------------------------------------------------
311
312/// Builder for [`DataLoader`].
313///
314/// # Example
315///
316/// ```rust,ignore
317/// let loader = DataLoader::<TradingViewSource>::builder()
318/// .source(tv_source)
319/// .sink(channel_sink)
320/// .channel_capacity(8192)
321/// .build()?;
322/// ```
323#[must_use]
324pub struct DataLoaderBuilder<S: DataSource> {
325 source: Option<S>,
326 sinks: Vec<Box<dyn EventSink>>,
327 sink_channels: Vec<EventChannel>,
328 config: LoaderConfig,
329}
330
331impl<S: DataSource> DataLoaderBuilder<S> {
332 /// Create a new builder with defaults.
333 pub fn new() -> Self {
334 Self {
335 source: None,
336 sinks: Vec::new(),
337 sink_channels: Vec::new(),
338 config: LoaderConfig::default(),
339 }
340 }
341
342 /// Set the data source (required).
343 pub fn source(mut self, source: S) -> Self {
344 self.source = Some(source);
345 self
346 }
347
348 /// Register an event sink. You can call this multiple times to fan out
349 /// to several sinks.
350 pub fn sink(mut self, sink: impl EventSink) -> Self {
351 let (tx, rx) = mpsc::channel::<Vec<MarketEvent>>(self.config.channel_capacity);
352 self.sink_channels.push((tx, rx));
353 let boxed: Box<dyn EventSink> = Box::new(sink);
354 self.sinks.push(boxed);
355 self
356 }
357
358 /// Override the default channel capacity (4096).
359 pub fn channel_capacity(mut self, capacity: usize) -> Self {
360 self.config.channel_capacity = capacity;
361 self
362 }
363
364 /// If `true`, the loader keeps running even when a sink rejects events.
365 pub fn continue_on_sink_error(mut self, val: bool) -> Self {
366 self.config.continue_on_sink_error = val;
367 self
368 }
369
370 /// Finish building the [`DataLoader`].
371 ///
372 /// # Errors
373 ///
374 /// - No source configured.
375 /// - No sinks registered.
376 pub fn build(self) -> Result<DataLoader<S>> {
377 let source = self
378 .source
379 .ok_or_else(|| crate::Error::Internal(ustr::ustr("no data source configured")))?;
380
381 if self.sinks.is_empty() {
382 return Err(crate::Error::Internal(ustr::ustr(
383 "at least one sink is required",
384 )));
385 }
386
387 Ok(DataLoader {
388 source: Some(source),
389 sinks: self.sinks,
390 sink_channels: self.sink_channels,
391 config: self.config,
392 cancel: CancellationToken::new(),
393 tasks: JoinSet::new(),
394 })
395 }
396}
397
398impl<S: DataSource> Default for DataLoaderBuilder<S> {
399 fn default() -> Self {
400 Self::new()
401 }
402}