Skip to main content

live_stream/
lib.rs

1//! # live-stream
2//!
3//! Consumer SDK for browsing and subscribing to live data feeds
4//! served by the companion [`live-feed`] publisher crate. A consumer
5//! connects to a publisher endpoint, fetches the manifest, constructs
6//! a subscription, and receives typed batches.
7
8pub mod builder;
9pub mod catalog;
10
11pub use builder::SubscriptionBuilder;
12pub use catalog::FeedCatalog;
13
14pub use live_data::{
15    CONTROL_TAG_FEEDS, CONTROL_TAG_SUBSCRIBE, Capabilities, Endpoint, Error, FeedDescriptor,
16    FeedManifest, FieldSpec, FilterExpr, FormatPreference, PROTOCOL_VERSION, Result, Sampling,
17    SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse, SubscriptionDescriptor,
18    TransportPreference, TransportTag, WireSchema,
19};
20
21#[cfg(test)]
22mod tests {
23    use super::*;
24
25    fn sample_manifest_json() -> String {
26        // Hand-crafted matches the FeedManifest serde shape so the
27        // test does not have to reach into live-feed to produce one.
28        format!(
29            r#"{{
30  "protocol_version": {pv},
31  "server_version": "test/0.1",
32  "feeds": [
33    {{
34      "name": "binance.trade.btcusdt",
35      "schema": {{ "fields": [
36        {{"name":"event_time","dtype":"i64","nullable":false}},
37        {{"name":"price","dtype":"f64","nullable":false}}
38      ] }},
39      "transports": ["web_socket", "tcp"],
40      "formats": ["arrow"],
41      "capabilities": {{ "can_project": true, "can_filter": false, "can_sample": false }},
42      "event_time_key": "event_time",
43      "tags": ["finance", "ticks"],
44      "description": "BTCUSDT trade stream"
45    }},
46    {{
47      "name": "coinbase.match.btc-usd",
48      "schema": {{ "fields": [
49        {{"name":"event_time","dtype":"i64","nullable":false}}
50      ] }},
51      "transports": ["web_socket"],
52      "formats": ["arrow"],
53      "capabilities": {{ "can_project": true, "can_filter": false, "can_sample": false }},
54      "event_time_key": "event_time",
55      "tags": ["finance"]
56    }}
57  ]
58}}"#,
59            pv = PROTOCOL_VERSION
60        )
61    }
62
63    #[test]
64    fn subscription_builder_defaults_match_descriptor_new() {
65        let from_builder = SubscriptionBuilder::new("feed").build();
66        let from_struct = SubscriptionDescriptor::new("feed");
67        assert_eq!(from_builder, from_struct);
68    }
69
70    #[test]
71    fn subscription_builder_collects_transport_preference() {
72        let sd = SubscriptionBuilder::new("feed")
73            .prefer_transport(TransportTag::Uds)
74            .prefer_transport(TransportTag::Tcp)
75            .columns(["a", "b"])
76            .format(FormatPreference::Arrow)
77            .build();
78        assert_eq!(sd.transport_pref.0, vec![TransportTag::Uds, TransportTag::Tcp]);
79        assert_eq!(sd.columns, Some(vec!["a".to_string(), "b".to_string()]));
80        assert_eq!(sd.format_pref, Some(FormatPreference::Arrow));
81    }
82
83    #[test]
84    fn feed_catalog_parses_manifest_json() {
85        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
86        assert_eq!(cat.protocol_version(), PROTOCOL_VERSION);
87        assert_eq!(cat.server_version(), "test/0.1");
88        assert_eq!(cat.len(), 2);
89    }
90
91    #[test]
92    fn feed_catalog_finds_by_exact_name() {
93        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
94        assert!(cat.find("binance.trade.btcusdt").is_some());
95        assert!(cat.find("nope").is_none());
96    }
97
98    #[test]
99    fn feed_catalog_filters_by_tag() {
100        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
101        let finance: Vec<&str> = cat
102            .filter_by_tag("finance")
103            .map(|f| f.name.as_str())
104            .collect();
105        assert_eq!(finance.len(), 2);
106        let ticks: Vec<&str> = cat
107            .filter_by_tag("ticks")
108            .map(|f| f.name.as_str())
109            .collect();
110        assert_eq!(ticks, vec!["binance.trade.btcusdt"]);
111    }
112
113    #[test]
114    fn feed_catalog_filters_by_transport() {
115        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
116        let tcp: Vec<&str> = cat
117            .filter_by_transport(TransportTag::Tcp)
118            .map(|f| f.name.as_str())
119            .collect();
120        assert_eq!(tcp, vec!["binance.trade.btcusdt"]);
121        let ws: Vec<&str> = cat
122            .filter_by_transport(TransportTag::WebSocket)
123            .map(|f| f.name.as_str())
124            .collect();
125        assert_eq!(ws.len(), 2);
126    }
127
128    #[test]
129    fn feed_catalog_searches_name_and_description() {
130        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
131        let by_name: Vec<&str> = cat.search("coinbase").map(|f| f.name.as_str()).collect();
132        assert_eq!(by_name, vec!["coinbase.match.btc-usd"]);
133        let by_desc: Vec<&str> = cat.search("BTCUSDT").map(|f| f.name.as_str()).collect();
134        assert_eq!(by_desc, vec!["binance.trade.btcusdt"]);
135    }
136}