use async_trait::async_trait;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tradingview::Result;
use tradingview::events::{CandleData, MarketEvent};
use tradingview::loader::DataLoader;
use tradingview::sink::{callback::CallbackSink, channel::ChannelSink};
use tradingview::source::DataSource;
#[derive(Clone)]
struct MockSource {
events: Vec<Vec<MarketEvent>>,
name: String,
error: Option<String>,
}
impl MockSource {
fn new(events: Vec<Vec<MarketEvent>>) -> Self {
Self {
events,
name: "mock".to_string(),
error: None,
}
}
#[allow(dead_code)]
fn with_error(mut self, error: &str) -> Self {
self.error = Some(error.to_string());
self
}
}
#[async_trait]
impl DataSource for MockSource {
async fn run(
&self,
sink: mpsc::Sender<Vec<MarketEvent>>,
cancel: CancellationToken,
) -> Result<()> {
if let Some(ref err) = self.error {
return Err(tradingview::Error::Internal(ustr::ustr(err)));
}
for batch in &self.events {
if cancel.is_cancelled() {
break;
}
if sink.send(batch.clone()).await.is_err() {
break;
}
tokio::task::yield_now().await;
}
Ok(())
}
fn name(&self) -> &str {
&self.name
}
}
fn make_candle(ts: i64, symbol: &str, o: f64, h: f64, l: f64, c: f64, v: f64) -> MarketEvent {
MarketEvent::Candle(CandleData::new(ts, symbol, "1D", o, h, l, c, v))
}
#[tokio::test]
async fn test_channel_sink_receives_events() {
let (channel_sink, mut rx) = ChannelSink::new(16);
let events = vec![make_candle(
1000,
"AAPL",
150.0,
155.0,
149.0,
153.0,
1_000_000.0,
)];
let source = MockSource::new(vec![events.clone()]);
let mut loader = DataLoader::builder()
.source(source)
.sink(channel_sink)
.build()
.expect("build failed");
loader.start().await.expect("start failed");
let received = rx.recv().await.expect("no events received");
assert_eq!(received.len(), 1);
if let MarketEvent::Candle(c) = &received[0] {
assert_eq!(c.symbol.as_str(), "AAPL");
assert!((c.open - 150.0).abs() < f64::EPSILON);
assert!((c.close - 153.0).abs() < f64::EPSILON);
} else {
panic!("expected Candle event");
}
loader.shutdown().await.expect("shutdown failed");
}
#[tokio::test]
async fn test_callback_sink_receives_events() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let callback = CallbackSink::new("test-cb", move |events: Vec<MarketEvent>| {
let cnt = Arc::clone(&counter_clone);
async move {
cnt.fetch_add(events.len(), Ordering::SeqCst);
Ok(())
}
});
let events = vec![
make_candle(1000, "MSFT", 300.0, 305.0, 299.0, 304.0, 500_000.0),
make_candle(2000, "MSFT", 304.0, 310.0, 303.0, 308.0, 600_000.0),
];
let source = MockSource::new(vec![events]);
let mut loader = DataLoader::builder()
.source(source)
.sink(callback)
.build()
.expect("build failed");
loader.start().await.expect("start failed");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
loader.shutdown().await.expect("shutdown failed");
let count = counter.load(Ordering::SeqCst);
assert_eq!(count, 2, "expected 2 events via callback, got {count}");
}
#[tokio::test]
async fn test_multiple_sinks() {
let (sink1, mut rx1) = ChannelSink::new(16);
let (sink2, mut rx2) = ChannelSink::new(16);
let events = vec![make_candle(
3000,
"TSLA",
200.0,
205.0,
195.0,
202.0,
2_000_000.0,
)];
let source = MockSource::new(vec![events]);
let mut loader = DataLoader::builder()
.source(source)
.sink(sink1)
.sink(sink2)
.build()
.expect("build failed");
loader.start().await.expect("start failed");
let r1 = rx1.recv().await.expect("sink1: no events");
let r2 = rx2.recv().await.expect("sink2: no events");
assert_eq!(r1.len(), 1);
assert_eq!(r2.len(), 1);
loader.shutdown().await.expect("shutdown failed");
}
#[tokio::test]
async fn test_builder_rejects_no_source() {
let (sink, _rx) = ChannelSink::new(1);
let result = DataLoader::<MockSource>::builder().sink(sink).build();
assert!(result.is_err());
let err_msg = format!("{}", result.err().unwrap());
assert!(err_msg.contains("no data source"));
}
#[tokio::test]
async fn test_builder_rejects_no_sinks() {
let source = MockSource::new(vec![]);
let result = DataLoader::builder().source(source).build();
assert!(result.is_err());
let err_msg = format!("{}", result.err().unwrap());
assert!(err_msg.contains("at least one sink"));
}
#[tokio::test]
async fn test_loader_cancel_token() {
let (sink, _rx) = ChannelSink::new(1);
let source = MockSource::new(vec![]);
let loader = DataLoader::builder()
.source(source)
.sink(sink)
.build()
.expect("build failed");
let token = loader.cancel_token();
assert!(!token.is_cancelled());
}
#[tokio::test]
async fn test_start_twice_errors() {
let (sink, mut rx) = ChannelSink::new(1);
let events = vec![make_candle(
5000, "GOOGL", 140.0, 142.0, 139.0, 141.0, 800_000.0,
)];
let source_clone = MockSource::new(vec![events]);
let mut loader = DataLoader::builder()
.source(source_clone)
.sink(sink)
.build()
.expect("build failed");
loader.start().await.expect("first start");
let _ = rx.recv().await;
loader.shutdown().await.expect("first shutdown");
let result = loader.start().await;
assert!(result.is_err());
}
#[test]
fn test_candle_data_helpers() {
let c = CandleData::new(
1_700_000_000,
"AAPL",
"1D",
150.0,
155.0,
149.0,
153.0,
1_000_000.0,
);
assert!(c.is_bullish());
assert!(!c.is_bearish());
assert!((c.typical_price() - (155.0 + 149.0 + 153.0) / 3.0).abs() < 1e-10);
let dt = c.datetime().expect("valid timestamp");
assert_eq!(dt.timestamp(), 1_700_000_000);
}
#[test]
fn test_candle_data_bearish() {
let c = CandleData::new(
1_700_000_000,
"AAPL",
"1D",
155.0,
156.0,
149.0,
150.0,
500_000.0,
);
assert!(!c.is_bullish());
assert!(c.is_bearish());
}
#[test]
fn test_market_event_timestamp() {
let candle = make_candle(999, "X", 1.0, 2.0, 1.0, 2.0, 1.0);
assert_eq!(candle.timestamp(), 999);
}
#[test]
fn test_market_event_symbol() {
let candle = make_candle(1, "TSLA", 1.0, 2.0, 1.0, 2.0, 1.0);
assert_eq!(candle.symbol(), Some("TSLA"));
}