use serde_json::Value;
use tokio::sync::mpsc;
use crate::{
Error,
live::{handler::command::Command, models::TradingViewDataEvent},
};
pub const DEFAULT_COMMAND_CHANNEL_CAPACITY: usize = 256;
pub type CommandTx = mpsc::Sender<Command>;
pub type CommandRx = mpsc::Receiver<Command>;
pub trait Handler: Send + Sync + 'static {
fn handle_events(&self, event: TradingViewDataEvent, message: &[Value]);
fn handle_quote_data(&self, message: &[Value]);
fn handle_series_data(&self, event: TradingViewDataEvent, messages: &[Value]);
fn notify_error(&self, error: Error, message: &[Value]);
}
pub trait HandlerFactory: Send + Sync + 'static {
type Handler: Handler;
fn create(&self, command_tx: CommandTx) -> Self::Handler;
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;
#[test]
fn test_command_tx_is_bounded() {
let (tx, _rx) = mpsc::channel::<Command>(DEFAULT_COMMAND_CHANNEL_CAPACITY);
let _command_tx: CommandTx = tx;
}
#[test]
fn test_command_rx_is_bounded() {
let (_tx, rx) = mpsc::channel::<Command>(DEFAULT_COMMAND_CHANNEL_CAPACITY);
let _command_rx: CommandRx = rx;
}
#[test]
fn test_default_capacity_is_reasonable() {
assert!(DEFAULT_COMMAND_CHANNEL_CAPACITY >= 64);
assert!(DEFAULT_COMMAND_CHANNEL_CAPACITY <= 4096);
}
#[tokio::test]
async fn test_bounded_channel_backpressure() {
let (tx, mut rx) = mpsc::channel::<u32>(4);
for i in 0..4 {
tx.send(i).await.expect("send should succeed");
}
let consumer = tokio::spawn(async move {
let mut drained = Vec::new();
while let Some(val) = rx.recv().await {
drained.push(val);
if drained.len() == 8 {
break;
}
}
drained
});
for i in 4..8 {
tx.send(i).await.expect("send after drain");
}
drop(tx);
let drained = consumer.await.unwrap();
assert_eq!(drained, vec![0, 1, 2, 3, 4, 5, 6, 7]);
}
#[tokio::test]
async fn test_try_send_backpressure() {
let (tx, mut _rx) = mpsc::channel::<u32>(2);
assert!(tx.try_send(1).is_ok());
assert!(tx.try_send(2).is_ok());
assert!(tx.try_send(3).is_err());
}
struct TestHandler {
events: std::sync::Mutex<Vec<String>>,
}
impl Handler for TestHandler {
fn handle_events(&self, _event: TradingViewDataEvent, message: &[Value]) {
self.events
.lock()
.unwrap()
.push(format!("event: {:?}", message));
}
fn handle_quote_data(&self, message: &[Value]) {
self.events
.lock()
.unwrap()
.push(format!("quote: {:?}", message));
}
fn handle_series_data(&self, _event: TradingViewDataEvent, messages: &[Value]) {
self.events
.lock()
.unwrap()
.push(format!("series: {:?}", messages));
}
fn notify_error(&self, _error: Error, message: &[Value]) {
self.events
.lock()
.unwrap()
.push(format!("error: {:?}", message));
}
}
struct TestHandlerFactory;
impl HandlerFactory for TestHandlerFactory {
type Handler = TestHandler;
fn create(&self, _command_tx: CommandTx) -> Self::Handler {
TestHandler {
events: std::sync::Mutex::new(Vec::new()),
}
}
}
#[test]
fn test_new_handler_compiles_and_works() {
let (_tx, _rx) = mpsc::channel::<Command>(4);
let factory = TestHandlerFactory;
let handler = factory.create(_tx);
handler.handle_events(
TradingViewDataEvent::OnChartData,
&[serde_json::json!({"test": true})],
);
let events = handler.events.lock().unwrap();
assert_eq!(events.len(), 1);
}
#[test]
fn test_handler_is_object_safe() {
let (_tx, _rx) = mpsc::channel::<Command>(4);
let factory = TestHandlerFactory;
let handler = factory.create(_tx);
let _arc: std::sync::Arc<dyn Handler> = std::sync::Arc::new(handler);
}
}