#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::sync::{mpsc, watch};
use atelier_data::sources::binance::events::BinanceWssEvent;
use atelier_data::sources::binance::responses::orderbooks::BinanceDepthUpdate;
use atelier_data::sources::binance::responses::trades::BinanceTradeData;
use atelier_data::sources::ExchangeEvent;
use atelier_data::workers::pipeline::{EventPipeline, PassthroughPipeline};
use atelier_data::workers::TopicMessage;
fn make_depth_update_msg(first_id: u64, last_id: u64, topic: &str) -> TopicMessage {
let upd = BinanceDepthUpdate {
event_type: "depthUpdate".to_string(),
event_time: 1672304484978,
symbol: "BTCUSDT".to_string(),
first_update_id: first_id,
last_update_id: last_id,
bids: vec![["21921.73".to_string(), "0.063".to_string()]],
asks: vec![["21922.00".to_string(), "0.500".to_string()]],
};
TopicMessage {
topic: topic.to_string(),
received_at_ns: 0,
exchange: "binance".to_string(),
payload: ExchangeEvent::Binance(BinanceWssEvent::DepthUpdate(upd)),
}
}
fn make_trade_msg(topic: &str) -> TopicMessage {
let trade = BinanceTradeData {
event_type: "trade".to_string(),
event_time: 1672304484978,
symbol: "BTCUSDT".to_string(),
trade_id: 12345,
price: "21921.73".to_string(),
quantity: "0.063".to_string(),
trade_time: 1672304484975,
is_buyer_maker: true,
};
TopicMessage {
topic: topic.to_string(),
received_at_ns: 0,
exchange: "binance".to_string(),
payload: ExchangeEvent::Binance(BinanceWssEvent::TradeData(trade)),
}
}
#[tokio::test]
async fn test_passthrough_forwards_all_events() {
let (in_tx, in_rx) = mpsc::channel(16);
let (out_tx, mut out_rx) = mpsc::channel(16);
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let pipeline = Box::new(PassthroughPipeline);
let handle = tokio::spawn(pipeline.run(in_rx, out_tx, shutdown_rx));
for i in 0..5 {
let msg = make_depth_update_msg(i, i + 1, "orderbook.50.BTCUSDT");
in_tx.send(msg).await.unwrap();
}
drop(in_tx);
let mut count = 0;
while let Some(_msg) = out_rx.recv().await {
count += 1;
}
assert_eq!(count, 5, "all 5 events should be forwarded");
handle.await.unwrap();
}
#[tokio::test]
async fn test_passthrough_closes_on_input_drop() {
let (in_tx, in_rx) = mpsc::channel(16);
let (out_tx, _out_rx) = mpsc::channel(16);
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let pipeline = Box::new(PassthroughPipeline);
let handle = tokio::spawn(pipeline.run(in_rx, out_tx, shutdown_rx));
drop(in_tx);
let result = tokio::time::timeout(Duration::from_secs(2), handle).await;
assert!(result.is_ok(), "pipeline should terminate when input closes");
}
async fn start_mock_server(
last_update_id: u64,
) -> (tokio::task::JoinHandle<()>, String) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{}", addr);
let body = format!(
r#"{{"lastUpdateId":{},"bids":[["50000.00","1.000"]],"asks":[["50001.00","2.000"]]}}"#,
last_update_id
);
let handle = tokio::spawn(async move {
loop {
let (mut stream, _) = match listener.accept().await {
Ok(s) => s,
Err(_) => break,
};
let body_clone = body.clone();
tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buf = [0u8; 4096];
let _ = stream.read(&mut buf).await;
let resp = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
body_clone.len(),
body_clone
);
let _ = stream.write_all(resp.as_bytes()).await;
let _ = stream.shutdown().await;
});
}
});
(handle, base_url)
}
#[tokio::test]
async fn test_trades_forwarded_during_buffering() {
use atelier_data::sources::binance::rest::BinanceRestClient;
use atelier_data::workers::pipeline::book_initializer::BookInitializer;
let (server_handle, base_url) = start_mock_server(100).await;
let rest = BinanceRestClient::new(base_url);
let initializer = BookInitializer::new(
rest,
"BTCUSDT".to_string(),
"orderbook.50.BTCUSDT".to_string(),
);
let (in_tx, in_rx) = mpsc::channel(64);
let (out_tx, mut out_rx) = mpsc::channel(64);
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(Box::new(initializer).run(in_rx, out_tx, shutdown_rx));
let trade = make_trade_msg("trade.all.BTCUSDT");
in_tx.send(trade).await.unwrap();
let result = tokio::time::timeout(Duration::from_secs(3), out_rx.recv()).await;
assert!(result.is_ok(), "trade should arrive during buffering");
let msg = result.unwrap().unwrap();
assert_eq!(msg.topic, "trade.all.BTCUSDT");
drop(in_tx);
let _ = handle.await;
server_handle.abort();
}
#[tokio::test]
async fn test_deltas_buffered_until_snapshot() {
use atelier_data::sources::binance::rest::BinanceRestClient;
use atelier_data::workers::pipeline::book_initializer::BookInitializer;
let (server_handle, base_url) = start_mock_server(100).await;
let rest = BinanceRestClient::new(base_url);
let initializer = BookInitializer::new(
rest,
"BTCUSDT".to_string(),
"orderbook.50.BTCUSDT".to_string(),
);
let (in_tx, in_rx) = mpsc::channel(64);
let (out_tx, mut out_rx) = mpsc::channel(64);
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(Box::new(initializer).run(in_rx, out_tx, shutdown_rx));
in_tx
.send(make_depth_update_msg(90, 95, "orderbook.50.BTCUSDT"))
.await
.unwrap();
in_tx
.send(make_depth_update_msg(96, 100, "orderbook.50.BTCUSDT"))
.await
.unwrap();
in_tx
.send(make_depth_update_msg(101, 105, "orderbook.50.BTCUSDT"))
.await
.unwrap();
in_tx
.send(make_depth_update_msg(106, 110, "orderbook.50.BTCUSDT"))
.await
.unwrap();
let mut received = Vec::new();
for _ in 0..10 {
match tokio::time::timeout(Duration::from_secs(3), out_rx.recv()).await {
Ok(Some(msg)) => received.push(msg),
_ => break,
}
}
assert!(
!received.is_empty(),
"should have received at least the snapshot"
);
assert!(
matches!(
&received[0].payload,
ExchangeEvent::Binance(BinanceWssEvent::DepthSnapshot(_))
),
"first output should be the synthesised snapshot"
);
let delta_count = received
.iter()
.skip(1)
.filter(|m| {
matches!(
&m.payload,
ExchangeEvent::Binance(BinanceWssEvent::DepthUpdate(_))
)
})
.count();
assert_eq!(delta_count, 2, "only 2 valid deltas should be forwarded");
drop(in_tx);
let _ = handle.await;
server_handle.abort();
}
#[tokio::test]
async fn test_stale_deltas_discarded() {
use atelier_data::sources::binance::rest::BinanceRestClient;
use atelier_data::workers::pipeline::book_initializer::BookInitializer;
let (server_handle, base_url) = start_mock_server(200).await;
let rest = BinanceRestClient::new(base_url);
let initializer = BookInitializer::new(
rest,
"BTCUSDT".to_string(),
"orderbook.50.BTCUSDT".to_string(),
);
let (in_tx, in_rx) = mpsc::channel(64);
let (out_tx, mut out_rx) = mpsc::channel(64);
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(Box::new(initializer).run(in_rx, out_tx, shutdown_rx));
in_tx
.send(make_depth_update_msg(180, 190, "orderbook.50.BTCUSDT"))
.await
.unwrap();
in_tx
.send(make_depth_update_msg(191, 200, "orderbook.50.BTCUSDT"))
.await
.unwrap();
let mut received = Vec::new();
for _ in 0..5 {
match tokio::time::timeout(Duration::from_secs(3), out_rx.recv()).await {
Ok(Some(msg)) => received.push(msg),
_ => break,
}
}
assert_eq!(received.len(), 1, "only snapshot, no stale deltas");
assert!(
matches!(
&received[0].payload,
ExchangeEvent::Binance(BinanceWssEvent::DepthSnapshot(_))
),
"only output should be the snapshot"
);
drop(in_tx);
let _ = handle.await;
server_handle.abort();
}
#[tokio::test]
async fn test_synced_state_forwards_directly() {
use atelier_data::sources::binance::rest::BinanceRestClient;
use atelier_data::workers::pipeline::book_initializer::BookInitializer;
let (server_handle, base_url) = start_mock_server(50).await;
let rest = BinanceRestClient::new(base_url);
let initializer = BookInitializer::new(
rest,
"BTCUSDT".to_string(),
"orderbook.50.BTCUSDT".to_string(),
);
let (in_tx, in_rx) = mpsc::channel(64);
let (out_tx, mut out_rx) = mpsc::channel(64);
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(Box::new(initializer).run(in_rx, out_tx, shutdown_rx));
tokio::time::sleep(Duration::from_millis(200)).await;
let snap = tokio::time::timeout(Duration::from_secs(3), out_rx.recv())
.await
.expect("should receive snapshot")
.expect("channel should not be closed");
assert!(matches!(
&snap.payload,
ExchangeEvent::Binance(BinanceWssEvent::DepthSnapshot(_))
));
let delta1 = make_depth_update_msg(51, 55, "orderbook.50.BTCUSDT");
in_tx.send(delta1).await.unwrap();
let delta2 = make_depth_update_msg(56, 60, "orderbook.50.BTCUSDT");
in_tx.send(delta2).await.unwrap();
let msg1 = tokio::time::timeout(Duration::from_secs(2), out_rx.recv())
.await
.expect("delta1 should arrive")
.unwrap();
let msg2 = tokio::time::timeout(Duration::from_secs(2), out_rx.recv())
.await
.expect("delta2 should arrive")
.unwrap();
assert!(matches!(
&msg1.payload,
ExchangeEvent::Binance(BinanceWssEvent::DepthUpdate(_))
));
assert!(matches!(
&msg2.payload,
ExchangeEvent::Binance(BinanceWssEvent::DepthUpdate(_))
));
drop(in_tx);
let _ = handle.await;
server_handle.abort();
}
}