pub mod builder;
pub mod local;
pub mod registry;
pub mod validate;
pub use builder::{FeedDescriptorBuilder, FeedManifestBuilder};
pub use local::LocalPublisher;
pub use registry::ManifestRegistry;
pub use validate::{
NegotiatedSubscription, ValidationError, negotiate_subscription, validate_descriptor,
};
pub use live_data::{
CONTROL_TAG_FEEDS, CONTROL_TAG_SUBSCRIBE, Capabilities, Endpoint, Error, FeedDescriptor,
FeedManifest, FieldSpec, FilterExpr, FormatPreference, PROTOCOL_VERSION, Result, Sampling,
SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse, SubscriptionDescriptor,
TransportPreference, TransportTag, WireSchema,
};
#[cfg(test)]
mod tests {
use super::*;
fn trade_schema() -> WireSchema {
WireSchema::new(vec![
FieldSpec::new("event_time", "i64"),
FieldSpec::new("symbol", "string"),
FieldSpec::new("price", "f64"),
])
}
fn build_default_descriptor() -> FeedDescriptor {
FeedDescriptorBuilder::new("binance.trade.btcusdt", trade_schema())
.transport(TransportTag::WebSocket)
.transport(TransportTag::Tcp)
.format(FormatPreference::Arrow)
.event_time_key("event_time")
.tag("finance")
.tag("ticks")
.description("BTCUSDT trade stream")
.build()
.expect("descriptor valid")
}
#[test]
fn descriptor_builder_validates_transports_present() {
let err = FeedDescriptorBuilder::new("x", trade_schema())
.format(FormatPreference::Arrow)
.build()
.unwrap_err();
assert!(matches!(err, ValidationError::EmptyTransports(_)));
}
#[test]
fn descriptor_builder_validates_event_time_key_in_schema() {
let err = FeedDescriptorBuilder::new("x", trade_schema())
.transport(TransportTag::Tcp)
.format(FormatPreference::Arrow)
.event_time_key("not_in_schema")
.build()
.unwrap_err();
assert!(matches!(err, ValidationError::EventTimeKeyNotInSchema { .. }));
}
#[test]
fn registry_rejects_duplicate_names() {
let mut reg = ManifestRegistry::new("test/0.1");
reg.register(build_default_descriptor()).unwrap();
let err = reg.register(build_default_descriptor()).unwrap_err();
assert!(matches!(err, ValidationError::DuplicateFeedName(n) if n == "binance.trade.btcusdt"));
}
#[test]
fn registry_preserves_insertion_order_in_manifest() {
let mut reg = ManifestRegistry::new("test/0.1");
let a = FeedDescriptorBuilder::new("a", trade_schema())
.transport(TransportTag::Tcp)
.format(FormatPreference::Arrow)
.build()
.unwrap();
let b = FeedDescriptorBuilder::new("b", trade_schema())
.transport(TransportTag::Tcp)
.format(FormatPreference::Arrow)
.build()
.unwrap();
let c = FeedDescriptorBuilder::new("c", trade_schema())
.transport(TransportTag::Tcp)
.format(FormatPreference::Arrow)
.build()
.unwrap();
reg.register(b).unwrap();
reg.register(a).unwrap();
reg.register(c).unwrap();
let m = reg.manifest();
let names: Vec<&str> = m.feeds.iter().map(|f| f.name.as_str()).collect();
assert_eq!(names, vec!["b", "a", "c"]);
}
#[test]
fn registry_round_trips_through_json() {
let mut reg = ManifestRegistry::new("test/0.1");
reg.register(build_default_descriptor()).unwrap();
let json = reg.manifest_json().unwrap();
let parsed: FeedManifest = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.feeds.len(), 1);
assert_eq!(parsed.feeds[0].name, "binance.trade.btcusdt");
assert_eq!(parsed.protocol_version, PROTOCOL_VERSION);
}
fn local_publisher() -> LocalPublisher {
let mut p = LocalPublisher::new("test/0.1", Endpoint::tcp("127.0.0.1:9000"));
p.register(build_default_descriptor()).unwrap();
p
}
#[test]
fn local_publisher_accepts_valid_subscription() {
let p = local_publisher();
let req = SubscriptionDescriptor::new("binance.trade.btcusdt");
match p.handle_subscribe(&req) {
SubscribeResponse::Ack(ack) => {
assert_eq!(ack.format, FormatPreference::Arrow);
assert_eq!(ack.endpoint.transport, TransportTag::WebSocket);
assert_eq!(ack.endpoint.address, "127.0.0.1:9000");
}
SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
_ => unreachable!("non-exhaustive guard"),
}
}
#[test]
fn local_publisher_rejects_unknown_feed() {
let p = local_publisher();
let req = SubscriptionDescriptor::new("no.such.feed");
match p.handle_subscribe(&req) {
SubscribeResponse::Err(e) => assert_eq!(e.code, SubscribeErrorCode::UnknownFeed),
other => panic!("expected unknown-feed error, got {other:?}"),
#[allow(unreachable_patterns)]
_ => unreachable!("non-exhaustive guard"),
}
}
#[test]
fn local_publisher_honours_transport_preference() {
let p = local_publisher();
let req = SubscriptionDescriptor::new("binance.trade.btcusdt")
.transport_pref(TransportPreference::one(TransportTag::Tcp));
match p.handle_subscribe(&req) {
SubscribeResponse::Ack(ack) => assert_eq!(ack.endpoint.transport, TransportTag::Tcp),
SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
_ => unreachable!("non-exhaustive guard"),
}
}
#[test]
fn local_publisher_rejects_unsupported_transport() {
let p = local_publisher();
let req = SubscriptionDescriptor::new("binance.trade.btcusdt")
.transport_pref(TransportPreference::one(TransportTag::Quic));
match p.handle_subscribe(&req) {
SubscribeResponse::Err(e) => {
assert_eq!(e.code, SubscribeErrorCode::UnsupportedTransport)
}
other => panic!("expected unsupported-transport error, got {other:?}"),
#[allow(unreachable_patterns)]
_ => unreachable!("non-exhaustive guard"),
}
}
#[test]
fn local_publisher_rejects_unknown_column() {
let p = local_publisher();
let req =
SubscriptionDescriptor::new("binance.trade.btcusdt").columns(["no_such_column"]);
match p.handle_subscribe(&req) {
SubscribeResponse::Err(e) => {
assert_eq!(e.code, SubscribeErrorCode::UnsupportedCapability)
}
other => panic!("expected unsupported-capability error, got {other:?}"),
#[allow(unreachable_patterns)]
_ => unreachable!("non-exhaustive guard"),
}
}
#[test]
fn local_publisher_accepts_known_columns() {
let p = local_publisher();
let req =
SubscriptionDescriptor::new("binance.trade.btcusdt").columns(["event_time", "price"]);
match p.handle_subscribe(&req) {
SubscribeResponse::Ack(ack) => {
let cols = ack.negotiated_columns.expect("columns present");
assert_eq!(cols, vec!["event_time".to_string(), "price".to_string()]);
}
SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
_ => unreachable!("non-exhaustive guard"),
}
}
}