live-feed 0.1.0

Publisher SDK for advertising and serving live data feeds. Consumers use the live-stream crate.
Documentation
//! # live-feed
//!
//! Publisher SDK for advertising and serving live data feeds. A
//! publisher declares one or more named feeds, builds a manifest
//! consumers can browse, and accepts subscription requests against
//! [`SubscriptionDescriptor`]s. Consumers are served by the companion
//! [`live-stream`] crate.
//!
//! ## What 0.1.0 ships
//!
//! - [`FeedDescriptorBuilder`] - builder for a feed entry.
//! - [`FeedManifestBuilder`] - aggregate descriptors into a manifest.
//! - [`ManifestRegistry`] - keep a live registry of declared feeds and
//!   produce a manifest snapshot, with JSON round-trip.
//! - [`LocalPublisher`] - in-memory publisher that validates incoming
//!   subscriptions against the registry's capabilities. Useful for
//!   tests, examples, and as a reference implementation of the
//!   accept/reject contract.
//! - [`ValidationError`] - reasons a descriptor or subscription is
//!   refused, plus the [`negotiate_subscription`] helper that exposes
//!   the validation engine on its own.
//!
//! ## What 0.2.x adds
//!
//! - `LiveFeedServer` - bind to one or more transports.
//! - `FeedPublisher` - typed handle a producer pushes batches into.
//! - `register_*_source` - declarative entry points for common source
//!   shapes (WebSocket-JSON, file-replay, etc.).
//!
//! [`live-stream`]: https://crates.io/crates/live-stream

pub mod builder;
pub mod local;
pub mod registry;
pub mod validate;

pub use builder::{FeedDescriptorBuilder, FeedManifestBuilder};
pub use local::LocalPublisher;
pub use registry::ManifestRegistry;
pub use validate::{
    NegotiatedSubscription, ValidationError, negotiate_subscription, validate_descriptor,
};

pub use live_data::{
    CONTROL_TAG_FEEDS, CONTROL_TAG_SUBSCRIBE, Capabilities, Endpoint, Error, FeedDescriptor,
    FeedManifest, FieldSpec, FilterExpr, FormatPreference, PROTOCOL_VERSION, Result, Sampling,
    SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse, SubscriptionDescriptor,
    TransportPreference, TransportTag, WireSchema,
};

#[cfg(test)]
mod tests {
    use super::*;

    fn trade_schema() -> WireSchema {
        WireSchema::new(vec![
            FieldSpec::new("event_time", "i64"),
            FieldSpec::new("symbol", "string"),
            FieldSpec::new("price", "f64"),
        ])
    }

    fn build_default_descriptor() -> FeedDescriptor {
        FeedDescriptorBuilder::new("binance.trade.btcusdt", trade_schema())
            .transport(TransportTag::WebSocket)
            .transport(TransportTag::Tcp)
            .format(FormatPreference::Arrow)
            .event_time_key("event_time")
            .tag("finance")
            .tag("ticks")
            .description("BTCUSDT trade stream")
            .build()
            .expect("descriptor valid")
    }

    #[test]
    fn descriptor_builder_validates_transports_present() {
        let err = FeedDescriptorBuilder::new("x", trade_schema())
            .format(FormatPreference::Arrow)
            .build()
            .unwrap_err();
        assert!(matches!(err, ValidationError::EmptyTransports(_)));
    }

    #[test]
    fn descriptor_builder_validates_event_time_key_in_schema() {
        let err = FeedDescriptorBuilder::new("x", trade_schema())
            .transport(TransportTag::Tcp)
            .format(FormatPreference::Arrow)
            .event_time_key("not_in_schema")
            .build()
            .unwrap_err();
        assert!(matches!(err, ValidationError::EventTimeKeyNotInSchema { .. }));
    }

    #[test]
    fn registry_rejects_duplicate_names() {
        let mut reg = ManifestRegistry::new("test/0.1");
        reg.register(build_default_descriptor()).unwrap();
        let err = reg.register(build_default_descriptor()).unwrap_err();
        assert!(matches!(err, ValidationError::DuplicateFeedName(n) if n == "binance.trade.btcusdt"));
    }

    #[test]
    fn registry_preserves_insertion_order_in_manifest() {
        let mut reg = ManifestRegistry::new("test/0.1");
        let a = FeedDescriptorBuilder::new("a", trade_schema())
            .transport(TransportTag::Tcp)
            .format(FormatPreference::Arrow)
            .build()
            .unwrap();
        let b = FeedDescriptorBuilder::new("b", trade_schema())
            .transport(TransportTag::Tcp)
            .format(FormatPreference::Arrow)
            .build()
            .unwrap();
        let c = FeedDescriptorBuilder::new("c", trade_schema())
            .transport(TransportTag::Tcp)
            .format(FormatPreference::Arrow)
            .build()
            .unwrap();
        reg.register(b).unwrap();
        reg.register(a).unwrap();
        reg.register(c).unwrap();
        let m = reg.manifest();
        let names: Vec<&str> = m.feeds.iter().map(|f| f.name.as_str()).collect();
        assert_eq!(names, vec!["b", "a", "c"]);
    }

    #[test]
    fn registry_round_trips_through_json() {
        let mut reg = ManifestRegistry::new("test/0.1");
        reg.register(build_default_descriptor()).unwrap();
        let json = reg.manifest_json().unwrap();
        let parsed: FeedManifest = serde_json::from_str(&json).unwrap();
        assert_eq!(parsed.feeds.len(), 1);
        assert_eq!(parsed.feeds[0].name, "binance.trade.btcusdt");
        assert_eq!(parsed.protocol_version, PROTOCOL_VERSION);
    }

    fn local_publisher() -> LocalPublisher {
        let mut p = LocalPublisher::new("test/0.1", Endpoint::tcp("127.0.0.1:9000"));
        p.register(build_default_descriptor()).unwrap();
        p
    }

    #[test]
    fn local_publisher_accepts_valid_subscription() {
        let p = local_publisher();
        let req = SubscriptionDescriptor::new("binance.trade.btcusdt");
        match p.handle_subscribe(&req) {
            SubscribeResponse::Ack(ack) => {
                assert_eq!(ack.format, FormatPreference::Arrow);
                assert_eq!(ack.endpoint.transport, TransportTag::WebSocket);
                assert_eq!(ack.endpoint.address, "127.0.0.1:9000");
            }
            SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
            _ => unreachable!("non-exhaustive guard"),
        }
    }

    #[test]
    fn local_publisher_rejects_unknown_feed() {
        let p = local_publisher();
        let req = SubscriptionDescriptor::new("no.such.feed");
        match p.handle_subscribe(&req) {
            SubscribeResponse::Err(e) => assert_eq!(e.code, SubscribeErrorCode::UnknownFeed),
            other => panic!("expected unknown-feed error, got {other:?}"),
            #[allow(unreachable_patterns)]
            _ => unreachable!("non-exhaustive guard"),
        }
    }

    #[test]
    fn local_publisher_honours_transport_preference() {
        let p = local_publisher();
        let req = SubscriptionDescriptor::new("binance.trade.btcusdt")
            .transport_pref(TransportPreference::one(TransportTag::Tcp));
        match p.handle_subscribe(&req) {
            SubscribeResponse::Ack(ack) => assert_eq!(ack.endpoint.transport, TransportTag::Tcp),
            SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
            _ => unreachable!("non-exhaustive guard"),
        }
    }

    #[test]
    fn local_publisher_rejects_unsupported_transport() {
        let p = local_publisher();
        let req = SubscriptionDescriptor::new("binance.trade.btcusdt")
            .transport_pref(TransportPreference::one(TransportTag::Quic));
        match p.handle_subscribe(&req) {
            SubscribeResponse::Err(e) => {
                assert_eq!(e.code, SubscribeErrorCode::UnsupportedTransport)
            }
            other => panic!("expected unsupported-transport error, got {other:?}"),
            #[allow(unreachable_patterns)]
            _ => unreachable!("non-exhaustive guard"),
        }
    }

    #[test]
    fn local_publisher_rejects_unknown_column() {
        let p = local_publisher();
        let req =
            SubscriptionDescriptor::new("binance.trade.btcusdt").columns(["no_such_column"]);
        match p.handle_subscribe(&req) {
            SubscribeResponse::Err(e) => {
                assert_eq!(e.code, SubscribeErrorCode::UnsupportedCapability)
            }
            other => panic!("expected unsupported-capability error, got {other:?}"),
            #[allow(unreachable_patterns)]
            _ => unreachable!("non-exhaustive guard"),
        }
    }

    #[test]
    fn local_publisher_accepts_known_columns() {
        let p = local_publisher();
        let req =
            SubscriptionDescriptor::new("binance.trade.btcusdt").columns(["event_time", "price"]);
        match p.handle_subscribe(&req) {
            SubscribeResponse::Ack(ack) => {
                let cols = ack.negotiated_columns.expect("columns present");
                assert_eq!(cols, vec!["event_time".to_string(), "price".to_string()]);
            }
            SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
            _ => unreachable!("non-exhaustive guard"),
        }
    }
}