Skip to main content

live_feed/
lib.rs

1//! # live-feed
2//!
3//! Publisher SDK for advertising and serving live data feeds. A
4//! publisher declares one or more named feeds, builds a manifest
5//! consumers can browse, and accepts subscription requests against
6//! [`SubscriptionDescriptor`]s. Consumers are served by the companion
7//! [`live-stream`] crate.
8//!
9//! ## What 0.1.0 ships
10//!
11//! - [`FeedDescriptorBuilder`] - builder for a feed entry.
12//! - [`FeedManifestBuilder`] - aggregate descriptors into a manifest.
13//! - [`ManifestRegistry`] - keep a live registry of declared feeds and
14//!   produce a manifest snapshot, with JSON round-trip.
15//! - [`LocalPublisher`] - in-memory publisher that validates incoming
16//!   subscriptions against the registry's capabilities. Useful for
17//!   tests, examples, and as a reference implementation of the
18//!   accept/reject contract.
19//! - [`ValidationError`] - reasons a descriptor or subscription is
20//!   refused, plus the [`negotiate_subscription`] helper that exposes
21//!   the validation engine on its own.
22//!
23//! ## What 0.2.x adds
24//!
25//! - `LiveFeedServer` - bind to one or more transports.
26//! - `FeedPublisher` - typed handle a producer pushes batches into.
27//! - `register_*_source` - declarative entry points for common source
28//!   shapes (WebSocket-JSON, file-replay, etc.).
29//!
30//! [`live-stream`]: https://crates.io/crates/live-stream
31
32pub mod builder;
33pub mod local;
34pub mod registry;
35pub mod validate;
36
37pub use builder::{FeedDescriptorBuilder, FeedManifestBuilder};
38pub use local::LocalPublisher;
39pub use registry::ManifestRegistry;
40pub use validate::{
41    NegotiatedSubscription, ValidationError, negotiate_subscription, validate_descriptor,
42};
43
44pub use live_data::{
45    CONTROL_TAG_FEEDS, CONTROL_TAG_SUBSCRIBE, Capabilities, Endpoint, Error, FeedDescriptor,
46    FeedManifest, FieldSpec, FilterExpr, FormatPreference, PROTOCOL_VERSION, Result, Sampling,
47    SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse, SubscriptionDescriptor,
48    TransportPreference, TransportTag, WireSchema,
49};
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54
55    fn trade_schema() -> WireSchema {
56        WireSchema::new(vec![
57            FieldSpec::new("event_time", "i64"),
58            FieldSpec::new("symbol", "string"),
59            FieldSpec::new("price", "f64"),
60        ])
61    }
62
63    fn build_default_descriptor() -> FeedDescriptor {
64        FeedDescriptorBuilder::new("binance.trade.btcusdt", trade_schema())
65            .transport(TransportTag::WebSocket)
66            .transport(TransportTag::Tcp)
67            .format(FormatPreference::Arrow)
68            .event_time_key("event_time")
69            .tag("finance")
70            .tag("ticks")
71            .description("BTCUSDT trade stream")
72            .build()
73            .expect("descriptor valid")
74    }
75
76    #[test]
77    fn descriptor_builder_validates_transports_present() {
78        let err = FeedDescriptorBuilder::new("x", trade_schema())
79            .format(FormatPreference::Arrow)
80            .build()
81            .unwrap_err();
82        assert!(matches!(err, ValidationError::EmptyTransports(_)));
83    }
84
85    #[test]
86    fn descriptor_builder_validates_event_time_key_in_schema() {
87        let err = FeedDescriptorBuilder::new("x", trade_schema())
88            .transport(TransportTag::Tcp)
89            .format(FormatPreference::Arrow)
90            .event_time_key("not_in_schema")
91            .build()
92            .unwrap_err();
93        assert!(matches!(err, ValidationError::EventTimeKeyNotInSchema { .. }));
94    }
95
96    #[test]
97    fn registry_rejects_duplicate_names() {
98        let mut reg = ManifestRegistry::new("test/0.1");
99        reg.register(build_default_descriptor()).unwrap();
100        let err = reg.register(build_default_descriptor()).unwrap_err();
101        assert!(matches!(err, ValidationError::DuplicateFeedName(n) if n == "binance.trade.btcusdt"));
102    }
103
104    #[test]
105    fn registry_preserves_insertion_order_in_manifest() {
106        let mut reg = ManifestRegistry::new("test/0.1");
107        let a = FeedDescriptorBuilder::new("a", trade_schema())
108            .transport(TransportTag::Tcp)
109            .format(FormatPreference::Arrow)
110            .build()
111            .unwrap();
112        let b = FeedDescriptorBuilder::new("b", trade_schema())
113            .transport(TransportTag::Tcp)
114            .format(FormatPreference::Arrow)
115            .build()
116            .unwrap();
117        let c = FeedDescriptorBuilder::new("c", trade_schema())
118            .transport(TransportTag::Tcp)
119            .format(FormatPreference::Arrow)
120            .build()
121            .unwrap();
122        reg.register(b).unwrap();
123        reg.register(a).unwrap();
124        reg.register(c).unwrap();
125        let m = reg.manifest();
126        let names: Vec<&str> = m.feeds.iter().map(|f| f.name.as_str()).collect();
127        assert_eq!(names, vec!["b", "a", "c"]);
128    }
129
130    #[test]
131    fn registry_round_trips_through_json() {
132        let mut reg = ManifestRegistry::new("test/0.1");
133        reg.register(build_default_descriptor()).unwrap();
134        let json = reg.manifest_json().unwrap();
135        let parsed: FeedManifest = serde_json::from_str(&json).unwrap();
136        assert_eq!(parsed.feeds.len(), 1);
137        assert_eq!(parsed.feeds[0].name, "binance.trade.btcusdt");
138        assert_eq!(parsed.protocol_version, PROTOCOL_VERSION);
139    }
140
141    fn local_publisher() -> LocalPublisher {
142        let mut p = LocalPublisher::new("test/0.1", Endpoint::tcp("127.0.0.1:9000"));
143        p.register(build_default_descriptor()).unwrap();
144        p
145    }
146
147    #[test]
148    fn local_publisher_accepts_valid_subscription() {
149        let p = local_publisher();
150        let req = SubscriptionDescriptor::new("binance.trade.btcusdt");
151        match p.handle_subscribe(&req) {
152            SubscribeResponse::Ack(ack) => {
153                assert_eq!(ack.format, FormatPreference::Arrow);
154                assert_eq!(ack.endpoint.transport, TransportTag::WebSocket);
155                assert_eq!(ack.endpoint.address, "127.0.0.1:9000");
156            }
157            SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
158            _ => unreachable!("non-exhaustive guard"),
159        }
160    }
161
162    #[test]
163    fn local_publisher_rejects_unknown_feed() {
164        let p = local_publisher();
165        let req = SubscriptionDescriptor::new("no.such.feed");
166        match p.handle_subscribe(&req) {
167            SubscribeResponse::Err(e) => assert_eq!(e.code, SubscribeErrorCode::UnknownFeed),
168            other => panic!("expected unknown-feed error, got {other:?}"),
169            #[allow(unreachable_patterns)]
170            _ => unreachable!("non-exhaustive guard"),
171        }
172    }
173
174    #[test]
175    fn local_publisher_honours_transport_preference() {
176        let p = local_publisher();
177        let req = SubscriptionDescriptor::new("binance.trade.btcusdt")
178            .transport_pref(TransportPreference::one(TransportTag::Tcp));
179        match p.handle_subscribe(&req) {
180            SubscribeResponse::Ack(ack) => assert_eq!(ack.endpoint.transport, TransportTag::Tcp),
181            SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
182            _ => unreachable!("non-exhaustive guard"),
183        }
184    }
185
186    #[test]
187    fn local_publisher_rejects_unsupported_transport() {
188        let p = local_publisher();
189        let req = SubscriptionDescriptor::new("binance.trade.btcusdt")
190            .transport_pref(TransportPreference::one(TransportTag::Quic));
191        match p.handle_subscribe(&req) {
192            SubscribeResponse::Err(e) => {
193                assert_eq!(e.code, SubscribeErrorCode::UnsupportedTransport)
194            }
195            other => panic!("expected unsupported-transport error, got {other:?}"),
196            #[allow(unreachable_patterns)]
197            _ => unreachable!("non-exhaustive guard"),
198        }
199    }
200
201    #[test]
202    fn local_publisher_rejects_unknown_column() {
203        let p = local_publisher();
204        let req =
205            SubscriptionDescriptor::new("binance.trade.btcusdt").columns(["no_such_column"]);
206        match p.handle_subscribe(&req) {
207            SubscribeResponse::Err(e) => {
208                assert_eq!(e.code, SubscribeErrorCode::UnsupportedCapability)
209            }
210            other => panic!("expected unsupported-capability error, got {other:?}"),
211            #[allow(unreachable_patterns)]
212            _ => unreachable!("non-exhaustive guard"),
213        }
214    }
215
216    #[test]
217    fn local_publisher_accepts_known_columns() {
218        let p = local_publisher();
219        let req =
220            SubscriptionDescriptor::new("binance.trade.btcusdt").columns(["event_time", "price"]);
221        match p.handle_subscribe(&req) {
222            SubscribeResponse::Ack(ack) => {
223                let cols = ack.negotiated_columns.expect("columns present");
224                assert_eq!(cols, vec!["event_time".to_string(), "price".to_string()]);
225            }
226            SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
227            _ => unreachable!("non-exhaustive guard"),
228        }
229    }
230}