1pub mod builder;
33pub mod local;
34pub mod registry;
35pub mod validate;
36
37pub use builder::{FeedDescriptorBuilder, FeedManifestBuilder};
38pub use local::LocalPublisher;
39pub use registry::ManifestRegistry;
40pub use validate::{
41 NegotiatedSubscription, ValidationError, negotiate_subscription, validate_descriptor,
42};
43
44pub use live_data::{
45 CONTROL_TAG_FEEDS, CONTROL_TAG_SUBSCRIBE, Capabilities, Endpoint, Error, FeedDescriptor,
46 FeedManifest, FieldSpec, FilterExpr, FormatPreference, PROTOCOL_VERSION, Result, Sampling,
47 SubscribeAck, SubscribeError, SubscribeErrorCode, SubscribeResponse, SubscriptionDescriptor,
48 TransportPreference, TransportTag, WireSchema,
49};
50
51#[cfg(test)]
52mod tests {
53 use super::*;
54
55 fn trade_schema() -> WireSchema {
56 WireSchema::new(vec![
57 FieldSpec::new("event_time", "i64"),
58 FieldSpec::new("symbol", "string"),
59 FieldSpec::new("price", "f64"),
60 ])
61 }
62
63 fn build_default_descriptor() -> FeedDescriptor {
64 FeedDescriptorBuilder::new("binance.trade.btcusdt", trade_schema())
65 .transport(TransportTag::WebSocket)
66 .transport(TransportTag::Tcp)
67 .format(FormatPreference::Arrow)
68 .event_time_key("event_time")
69 .tag("finance")
70 .tag("ticks")
71 .description("BTCUSDT trade stream")
72 .build()
73 .expect("descriptor valid")
74 }
75
76 #[test]
77 fn descriptor_builder_validates_transports_present() {
78 let err = FeedDescriptorBuilder::new("x", trade_schema())
79 .format(FormatPreference::Arrow)
80 .build()
81 .unwrap_err();
82 assert!(matches!(err, ValidationError::EmptyTransports(_)));
83 }
84
85 #[test]
86 fn descriptor_builder_validates_event_time_key_in_schema() {
87 let err = FeedDescriptorBuilder::new("x", trade_schema())
88 .transport(TransportTag::Tcp)
89 .format(FormatPreference::Arrow)
90 .event_time_key("not_in_schema")
91 .build()
92 .unwrap_err();
93 assert!(matches!(err, ValidationError::EventTimeKeyNotInSchema { .. }));
94 }
95
96 #[test]
97 fn registry_rejects_duplicate_names() {
98 let mut reg = ManifestRegistry::new("test/0.1");
99 reg.register(build_default_descriptor()).unwrap();
100 let err = reg.register(build_default_descriptor()).unwrap_err();
101 assert!(matches!(err, ValidationError::DuplicateFeedName(n) if n == "binance.trade.btcusdt"));
102 }
103
104 #[test]
105 fn registry_preserves_insertion_order_in_manifest() {
106 let mut reg = ManifestRegistry::new("test/0.1");
107 let a = FeedDescriptorBuilder::new("a", trade_schema())
108 .transport(TransportTag::Tcp)
109 .format(FormatPreference::Arrow)
110 .build()
111 .unwrap();
112 let b = FeedDescriptorBuilder::new("b", trade_schema())
113 .transport(TransportTag::Tcp)
114 .format(FormatPreference::Arrow)
115 .build()
116 .unwrap();
117 let c = FeedDescriptorBuilder::new("c", trade_schema())
118 .transport(TransportTag::Tcp)
119 .format(FormatPreference::Arrow)
120 .build()
121 .unwrap();
122 reg.register(b).unwrap();
123 reg.register(a).unwrap();
124 reg.register(c).unwrap();
125 let m = reg.manifest();
126 let names: Vec<&str> = m.feeds.iter().map(|f| f.name.as_str()).collect();
127 assert_eq!(names, vec!["b", "a", "c"]);
128 }
129
130 #[test]
131 fn registry_round_trips_through_json() {
132 let mut reg = ManifestRegistry::new("test/0.1");
133 reg.register(build_default_descriptor()).unwrap();
134 let json = reg.manifest_json().unwrap();
135 let parsed: FeedManifest = serde_json::from_str(&json).unwrap();
136 assert_eq!(parsed.feeds.len(), 1);
137 assert_eq!(parsed.feeds[0].name, "binance.trade.btcusdt");
138 assert_eq!(parsed.protocol_version, PROTOCOL_VERSION);
139 }
140
141 fn local_publisher() -> LocalPublisher {
142 let mut p = LocalPublisher::new("test/0.1", Endpoint::tcp("127.0.0.1:9000"));
143 p.register(build_default_descriptor()).unwrap();
144 p
145 }
146
147 #[test]
148 fn local_publisher_accepts_valid_subscription() {
149 let p = local_publisher();
150 let req = SubscriptionDescriptor::new("binance.trade.btcusdt");
151 match p.handle_subscribe(&req) {
152 SubscribeResponse::Ack(ack) => {
153 assert_eq!(ack.format, FormatPreference::Arrow);
154 assert_eq!(ack.endpoint.transport, TransportTag::WebSocket);
155 assert_eq!(ack.endpoint.address, "127.0.0.1:9000");
156 }
157 SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
158 _ => unreachable!("non-exhaustive guard"),
159 }
160 }
161
162 #[test]
163 fn local_publisher_rejects_unknown_feed() {
164 let p = local_publisher();
165 let req = SubscriptionDescriptor::new("no.such.feed");
166 match p.handle_subscribe(&req) {
167 SubscribeResponse::Err(e) => assert_eq!(e.code, SubscribeErrorCode::UnknownFeed),
168 other => panic!("expected unknown-feed error, got {other:?}"),
169 #[allow(unreachable_patterns)]
170 _ => unreachable!("non-exhaustive guard"),
171 }
172 }
173
174 #[test]
175 fn local_publisher_honours_transport_preference() {
176 let p = local_publisher();
177 let req = SubscriptionDescriptor::new("binance.trade.btcusdt")
178 .transport_pref(TransportPreference::one(TransportTag::Tcp));
179 match p.handle_subscribe(&req) {
180 SubscribeResponse::Ack(ack) => assert_eq!(ack.endpoint.transport, TransportTag::Tcp),
181 SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
182 _ => unreachable!("non-exhaustive guard"),
183 }
184 }
185
186 #[test]
187 fn local_publisher_rejects_unsupported_transport() {
188 let p = local_publisher();
189 let req = SubscriptionDescriptor::new("binance.trade.btcusdt")
190 .transport_pref(TransportPreference::one(TransportTag::Quic));
191 match p.handle_subscribe(&req) {
192 SubscribeResponse::Err(e) => {
193 assert_eq!(e.code, SubscribeErrorCode::UnsupportedTransport)
194 }
195 other => panic!("expected unsupported-transport error, got {other:?}"),
196 #[allow(unreachable_patterns)]
197 _ => unreachable!("non-exhaustive guard"),
198 }
199 }
200
201 #[test]
202 fn local_publisher_rejects_unknown_column() {
203 let p = local_publisher();
204 let req =
205 SubscriptionDescriptor::new("binance.trade.btcusdt").columns(["no_such_column"]);
206 match p.handle_subscribe(&req) {
207 SubscribeResponse::Err(e) => {
208 assert_eq!(e.code, SubscribeErrorCode::UnsupportedCapability)
209 }
210 other => panic!("expected unsupported-capability error, got {other:?}"),
211 #[allow(unreachable_patterns)]
212 _ => unreachable!("non-exhaustive guard"),
213 }
214 }
215
216 #[test]
217 fn local_publisher_accepts_known_columns() {
218 let p = local_publisher();
219 let req =
220 SubscriptionDescriptor::new("binance.trade.btcusdt").columns(["event_time", "price"]);
221 match p.handle_subscribe(&req) {
222 SubscribeResponse::Ack(ack) => {
223 let cols = ack.negotiated_columns.expect("columns present");
224 assert_eq!(cols, vec!["event_time".to_string(), "price".to_string()]);
225 }
226 SubscribeResponse::Err(e) => panic!("unexpected error: {e:?}"),
227 _ => unreachable!("non-exhaustive guard"),
228 }
229 }
230}