crypto-crawler 4.7.9

A rock-solid cryprocurrency crawler.
Documentation
use crypto_crawler::Message;
use crypto_market_type::MarketType;
use crypto_msg_type::MessageType;

pub(crate) fn parse(msg: Message) -> bool {
    let skipped_exchanges = vec!["bitget", "zb"];
    if skipped_exchanges.contains(&msg.exchange.as_str()) {
        return true;
    }
    match msg.msg_type {
        MessageType::Trade => {
            crypto_msg_parser::parse_trade(&msg.exchange, msg.market_type, &msg.json).is_ok()
        }
        MessageType::L2Event => {
            match msg.market_type {
                // crypto-msg-parser doesn't support quanto contracts
                MarketType::QuantoSwap | MarketType::QuantoFuture => true,
                _ => crypto_msg_parser::parse_l2(
                    &msg.exchange,
                    msg.market_type,
                    &msg.json,
                    Some(msg.received_at as i64),
                )
                .is_ok(),
            }
        }
        MessageType::FundingRate => crypto_msg_parser::parse_funding_rate(
            &msg.exchange,
            msg.market_type,
            &msg.json,
            Some(msg.received_at as i64),
        )
        .is_ok(),
        _ => true,
    }
}

#[allow(unused_macros)]
macro_rules! test_one_symbol {
    ($crawl_func:ident, $exchange:expr, $market_type:expr, $symbol:expr, $msg_type:expr) => {{
        let (tx, rx) = std::sync::mpsc::channel();
        let symbols = vec![$symbol.to_string()];
        tokio::task::spawn(async move {
            $crawl_func($exchange, $market_type, Some(&symbols), tx).await;
        });

        let msg = rx.recv().unwrap();

        assert_eq!(msg.exchange, $exchange.to_string());
        assert_eq!(msg.market_type, $market_type);
        assert_eq!(msg.msg_type, $msg_type);

        assert!(tokio::task::block_in_place(move || parse(msg)));
    }};
}

#[allow(unused_macros)]
macro_rules! test_all_symbols {
    ($crawl_func:ident, $exchange:expr, $market_type:expr, $msg_type:expr) => {{
        let (tx, rx) = std::sync::mpsc::channel();
        tokio::task::spawn(async move {
            $crawl_func($exchange, $market_type, None, tx).await;
        });

        let msg = rx.recv().unwrap();

        assert_eq!(msg.exchange, $exchange.to_string());
        assert_eq!(msg.market_type, $market_type);
        assert_eq!(msg.msg_type, $msg_type);

        assert!(tokio::task::block_in_place(move || parse(msg)));
    }};
}

#[allow(unused_macros)]
macro_rules! test_crawl_restful {
    ($crawl_func:ident, $exchange:expr, $market_type:expr, $symbol:expr, $msg_type:expr) => {{
        let (tx, rx) = std::sync::mpsc::channel();
        let symbols = vec![$symbol.to_string()];
        std::thread::spawn(move || {
            $crawl_func($exchange, $market_type, Some(&symbols), tx);
        });

        let msg = rx.recv().unwrap();

        assert_eq!(msg.exchange, $exchange.to_string());
        assert_eq!(msg.market_type, $market_type);
        assert_eq!(msg.msg_type, $msg_type);

        assert!(parse(msg));
    }};
}

#[allow(unused_macros)]
macro_rules! test_crawl_restful_all_symbols {
    ($crawl_func:ident, $exchange:expr, $market_type:expr, $msg_type:expr) => {{
        let (tx, rx) = std::sync::mpsc::channel();
        std::thread::spawn(move || {
            $crawl_func($exchange, $market_type, None, tx);
        });

        let msg = rx.recv().unwrap();

        assert_eq!(msg.exchange, $exchange.to_string());
        assert_eq!(msg.market_type, $market_type);
        assert_eq!(msg.msg_type, $msg_type);

        assert!(parse(msg));
    }};
}

#[allow(unused_macros)]
macro_rules! gen_test_crawl_candlestick {
    ($exchange:expr, $market_type:expr) => {{
        let (tx, rx) = std::sync::mpsc::channel();
        tokio::task::spawn(async move {
            crawl_candlestick($exchange, $market_type, None, tx).await;
        });

        let msg = rx.recv().unwrap();

        assert_eq!(msg.exchange, $exchange.to_string());
        assert_eq!(msg.market_type, $market_type);
        assert_eq!(msg.msg_type, MessageType::Candlestick);

        assert!(tokio::task::block_in_place(move || parse(msg)));
    }};
}

#[allow(unused_macros)]
macro_rules! gen_test_subscribe_symbol {
    ($exchange:expr, $market_type:expr, $symbol:expr) => {{
        let (tx, rx) = std::sync::mpsc::channel();
        tokio::task::spawn(async move {
            let msg_types = vec![MessageType::Trade, MessageType::L2Event];
            subscribe_symbol($exchange, $market_type, $symbol, &msg_types, tx).await;
        });

        let mut messages = Vec::new();
        for msg in rx {
            messages.push(msg);
            break;
        }
        assert!(!messages.is_empty());
    }};
}