live-stream 0.1.0

Consumer SDK for browsing and subscribing to live data feeds. Publishers use the live-feed crate.
Documentation
//! # live-stream
//!
//! Consumer SDK for browsing and subscribing to live data feeds
//! served by the companion [`live-feed`] publisher crate. A consumer
//! connects to a publisher endpoint, fetches the manifest, constructs
//! a subscription, and receives typed batches.

pub mod builder;
pub mod catalog;

pub use builder::SubscriptionBuilder;
pub use catalog::FeedCatalog;

pub use live_data::{
    CONTROL_TAG_FEEDS, CONTROL_TAG_SUBSCRIBE, Capabilities, Endpoint, Error, FeedDescriptor,
    FeedManifest, FieldSpec, FilterExpr, FormatPreference, PROTOCOL_VERSION, Result, Sampling,
    SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse, SubscriptionDescriptor,
    TransportPreference, TransportTag, WireSchema,
};

#[cfg(test)]
mod tests {
    use super::*;

    fn sample_manifest_json() -> String {
        // Hand-crafted matches the FeedManifest serde shape so the
        // test does not have to reach into live-feed to produce one.
        format!(
            r#"{{
  "protocol_version": {pv},
  "server_version": "test/0.1",
  "feeds": [
    {{
      "name": "binance.trade.btcusdt",
      "schema": {{ "fields": [
        {{"name":"event_time","dtype":"i64","nullable":false}},
        {{"name":"price","dtype":"f64","nullable":false}}
      ] }},
      "transports": ["web_socket", "tcp"],
      "formats": ["arrow"],
      "capabilities": {{ "can_project": true, "can_filter": false, "can_sample": false }},
      "event_time_key": "event_time",
      "tags": ["finance", "ticks"],
      "description": "BTCUSDT trade stream"
    }},
    {{
      "name": "coinbase.match.btc-usd",
      "schema": {{ "fields": [
        {{"name":"event_time","dtype":"i64","nullable":false}}
      ] }},
      "transports": ["web_socket"],
      "formats": ["arrow"],
      "capabilities": {{ "can_project": true, "can_filter": false, "can_sample": false }},
      "event_time_key": "event_time",
      "tags": ["finance"]
    }}
  ]
}}"#,
            pv = PROTOCOL_VERSION
        )
    }

    #[test]
    fn subscription_builder_defaults_match_descriptor_new() {
        let from_builder = SubscriptionBuilder::new("feed").build();
        let from_struct = SubscriptionDescriptor::new("feed");
        assert_eq!(from_builder, from_struct);
    }

    #[test]
    fn subscription_builder_collects_transport_preference() {
        let sd = SubscriptionBuilder::new("feed")
            .prefer_transport(TransportTag::Uds)
            .prefer_transport(TransportTag::Tcp)
            .columns(["a", "b"])
            .format(FormatPreference::Arrow)
            .build();
        assert_eq!(sd.transport_pref.0, vec![TransportTag::Uds, TransportTag::Tcp]);
        assert_eq!(sd.columns, Some(vec!["a".to_string(), "b".to_string()]));
        assert_eq!(sd.format_pref, Some(FormatPreference::Arrow));
    }

    #[test]
    fn feed_catalog_parses_manifest_json() {
        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
        assert_eq!(cat.protocol_version(), PROTOCOL_VERSION);
        assert_eq!(cat.server_version(), "test/0.1");
        assert_eq!(cat.len(), 2);
    }

    #[test]
    fn feed_catalog_finds_by_exact_name() {
        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
        assert!(cat.find("binance.trade.btcusdt").is_some());
        assert!(cat.find("nope").is_none());
    }

    #[test]
    fn feed_catalog_filters_by_tag() {
        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
        let finance: Vec<&str> = cat
            .filter_by_tag("finance")
            .map(|f| f.name.as_str())
            .collect();
        assert_eq!(finance.len(), 2);
        let ticks: Vec<&str> = cat
            .filter_by_tag("ticks")
            .map(|f| f.name.as_str())
            .collect();
        assert_eq!(ticks, vec!["binance.trade.btcusdt"]);
    }

    #[test]
    fn feed_catalog_filters_by_transport() {
        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
        let tcp: Vec<&str> = cat
            .filter_by_transport(TransportTag::Tcp)
            .map(|f| f.name.as_str())
            .collect();
        assert_eq!(tcp, vec!["binance.trade.btcusdt"]);
        let ws: Vec<&str> = cat
            .filter_by_transport(TransportTag::WebSocket)
            .map(|f| f.name.as_str())
            .collect();
        assert_eq!(ws.len(), 2);
    }

    #[test]
    fn feed_catalog_searches_name_and_description() {
        let cat = FeedCatalog::from_json_str(&sample_manifest_json()).unwrap();
        let by_name: Vec<&str> = cat.search("coinbase").map(|f| f.name.as_str()).collect();
        assert_eq!(by_name, vec!["coinbase.match.btc-usd"]);
        let by_desc: Vec<&str> = cat.search("BTCUSDT").map(|f| f.name.as_str()).collect();
        assert_eq!(by_desc, vec!["binance.trade.btcusdt"]);
    }
}