pub mod builder;
pub mod catalog;
pub use builder::SubscriptionBuilder;
pub use catalog::FeedCatalog;
pub use live_data::{
CONTROL_TAG_FEEDS, CONTROL_TAG_SUBSCRIBE, Capabilities, Endpoint, Error, FeedDescriptor,
FeedManifest, FieldSpec, FilterExpr, FormatPreference, PROTOCOL_VERSION, Result, Sampling,
SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse, SubscriptionDescriptor,
TransportPreference, TransportTag, WireSchema,
};
#[cfg(test)]
mod tests {
use super::*;
fn sample_manifest_json() -> String {
format!(
r#"{{
"protocol_version": {pv},
"server_version": "test/0.1",
"feeds": [
{{
"name": "binance.trade.btcusdt",
"schema": {{ "fields": [
{{"name":"event_time","dtype":"i64","nullable":false}},
{{"name":"price","dtype":"f64","nullable":false}}
] }},
"transports": ["web_socket", "tcp"],
"formats": ["arrow"],
"capabilities": {{ "can_project": true, "can_filter": false, "can_sample": false }},
"event_time_key": "event_time",
"tags": ["finance", "ticks"],
"description": "BTCUSDT trade stream"
}},
{{
"name": "coinbase.match.btc-usd",
"schema": {{ "fields": [
{{"name":"event_time","dtype":"i64","nullable":false}}
] }},
"transports": ["web_socket"],
"formats": ["arrow"],
"capabilities": {{ "can_project": true, "can_filter": false, "can_sample": false }},
"event_time_key": "event_time",
"tags": ["finance"]
}}
]
}}"#,
pv = PROTOCOL_VERSION
)
}
#[test]
fn subscription_builder_defaults_match_descriptor_new() {
let from_builder = SubscriptionBuilder::new("feed").build();
let from_struct = SubscriptionDescriptor::new("feed");
assert_eq!(from_builder, from_struct);
}
#[test]
fn subscription_builder_collects_transport_preference() {
let sd = SubscriptionBuilder::new("feed")
.prefer_transport(TransportTag::Uds)
.prefer_transport(TransportTag::Tcp)
.columns(["a", "b"])
.format(FormatPreference::Arrow)
.build();
assert_eq!(sd.transport_pref.0, vec![TransportTag::Uds, TransportTag::Tcp]);
assert_eq!(sd.columns, Some(vec!["a".to_string(), "b".to_string()]));
assert_eq!(sd.format_pref, Some(FormatPreference::Arrow));
}
#[test]
fn feed_catalog_parses_manifest_json() {
let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
assert_eq!(cat.protocol_version(), PROTOCOL_VERSION);
assert_eq!(cat.server_version(), "test/0.1");
assert_eq!(cat.len(), 2);
}
#[test]
fn feed_catalog_finds_by_exact_name() {
let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
assert!(cat.find("binance.trade.btcusdt").is_some());
assert!(cat.find("nope").is_none());
}
#[test]
fn feed_catalog_filters_by_tag() {
let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
let finance: Vec<&str> = cat
.filter_by_tag("finance")
.map(|f| f.name.as_str())
.collect();
assert_eq!(finance.len(), 2);
let ticks: Vec<&str> = cat
.filter_by_tag("ticks")
.map(|f| f.name.as_str())
.collect();
assert_eq!(ticks, vec!["binance.trade.btcusdt"]);
}
#[test]
fn feed_catalog_filters_by_transport() {
let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
let tcp: Vec<&str> = cat
.filter_by_transport(TransportTag::Tcp)
.map(|f| f.name.as_str())
.collect();
assert_eq!(tcp, vec!["binance.trade.btcusdt"]);
let ws: Vec<&str> = cat
.filter_by_transport(TransportTag::WebSocket)
.map(|f| f.name.as_str())
.collect();
assert_eq!(ws.len(), 2);
}
#[test]
fn feed_catalog_searches_name_and_description() {
let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
let by_name: Vec<&str> = cat.search("coinbase").map(|f| f.name.as_str()).collect();
assert_eq!(by_name, vec!["coinbase.match.btc-usd"]);
let by_desc: Vec<&str> = cat.search("BTCUSDT").map(|f| f.name.as_str()).collect();
assert_eq!(by_desc, vec!["binance.trade.btcusdt"]);
}
}