1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5mod types;
6pub mod upload;
7
8pub mod prelude {
10 pub use conjure_object::BearerToken;
11 pub use conjure_object::ResourceIdentifier;
12 pub use nominal_api::tonic::google::protobuf::Timestamp;
13 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
14 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
15 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
16 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
17 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
18 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
19 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
20 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
21 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
22
23 pub use crate::consumer::NominalCoreConsumer;
24 pub use crate::stream::NominalDatasetStream;
25 pub use crate::stream::NominalDatasourceStream;
26 pub use crate::stream::NominalStreamOpts;
27 pub use crate::types::AuthProvider;
28 pub use crate::types::ChannelDescriptor;
29}
30
31#[cfg(test)]
32mod tests {
33 use std::collections::HashMap;
34 use std::sync::Arc;
35 use std::sync::Mutex;
36 use std::thread;
37 use std::time::Duration;
38 use std::time::UNIX_EPOCH;
39
40 use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
41
42 use crate::client::PRODUCTION_API_URL;
43 use crate::consumer::ConsumerResult;
44 use crate::consumer::WriteRequestConsumer;
45 use crate::prelude::*;
46 use crate::types::IntoTimestamp;
47
48 #[derive(Debug)]
49 struct TestDatasourceStream {
50 requests: Mutex<Vec<WriteRequestNominal>>,
51 }
52
53 impl WriteRequestConsumer for Arc<TestDatasourceStream> {
54 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
55 self.requests.lock().unwrap().push(request.clone());
56 Ok(())
57 }
58 }
59
60 fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
61 let test_consumer = Arc::new(TestDatasourceStream {
62 requests: Mutex::new(vec![]),
63 });
64 let stream = NominalDatasetStream::new_with_consumer(
65 test_consumer.clone(),
66 NominalStreamOpts {
67 max_points_per_record: 1000,
68 max_request_delay: Duration::from_millis(100),
69 max_buffered_requests: 2,
70 request_dispatcher_tasks: 4,
71 base_api_url: PRODUCTION_API_URL.to_string(),
72 },
73 );
74
75 (test_consumer, stream)
76 }
77
78 #[test]
79 fn test_stream() {
80 let (test_consumer, stream) = create_test_stream();
81
82 for batch in 0..5 {
83 let mut points = Vec::new();
84 for i in 0..1000 {
85 let start_time = UNIX_EPOCH.elapsed().unwrap();
86 points.push(DoublePoint {
87 timestamp: Some(Timestamp {
88 seconds: start_time.as_secs() as i64,
89 nanos: start_time.subsec_nanos() as i32 + i,
90 }),
91 value: (i % 50) as f64,
92 });
93 }
94
95 stream.enqueue(
96 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
97 points,
98 );
99 }
100
101 drop(stream); let requests = test_consumer.requests.lock().unwrap();
104
105 assert_eq!(requests.len(), 5);
108 let series = requests.first().unwrap().series.first().unwrap();
109 if let Some(PointsType::DoublePoints(points)) =
110 series.points.as_ref().unwrap().points_type.as_ref()
111 {
112 assert_eq!(points.points.len(), 1000);
113 } else {
114 panic!("unexpected data type");
115 }
116 }
117
118 #[test]
119 fn test_stream_types() {
120 let (test_consumer, stream) = create_test_stream();
121
122 for batch in 0..5 {
123 let mut doubles = Vec::new();
124 let mut strings = Vec::new();
125 let mut ints = Vec::new();
126 for i in 0..1000 {
127 let start_time = UNIX_EPOCH.elapsed().unwrap();
128 doubles.push(DoublePoint {
129 timestamp: Some(start_time.into_timestamp()),
130 value: (i % 50) as f64,
131 });
132 strings.push(StringPoint {
133 timestamp: Some(start_time.into_timestamp()),
134 value: format!("{}", i % 50),
135 });
136 ints.push(IntegerPoint {
137 timestamp: Some(start_time.into_timestamp()),
138 value: i % 50,
139 })
140 }
141
142 stream.enqueue(
143 &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
144 doubles,
145 );
146 stream.enqueue(
147 &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
148 strings,
149 );
150 stream.enqueue(
151 &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
152 ints,
153 );
154 }
155
156 drop(stream); let requests = test_consumer.requests.lock().unwrap();
159
160 assert_eq!(requests.len(), 15);
163
164 let r = requests
165 .iter()
166 .flat_map(|r| r.series.clone())
167 .map(|s| {
168 (
169 s.channel.unwrap().name,
170 s.points.unwrap().points_type.unwrap(),
171 )
172 })
173 .collect::<HashMap<_, _>>();
174 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
175 panic!("invalid double points type");
176 };
177
178 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
179 panic!("invalid int points type");
180 };
181
182 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
183 panic!("invalid string points type");
184 };
185
186 assert_eq!(dp.points.len(), 1000);
188 assert_eq!(sp.points.len(), 1000);
189 assert_eq!(ip.points.len(), 1000);
190 }
191
192 #[test_log::test]
193 fn test_writer() {
194 let (test_consumer, stream) = create_test_stream();
195
196 let cd = ChannelDescriptor::new("channel_1");
197 let mut writer = stream.double_writer(&cd);
198
199 for i in 0..5000 {
200 let start_time = UNIX_EPOCH.elapsed().unwrap();
201 let value = i % 50;
202 writer.push(start_time, value as f64);
203 }
204
205 drop(writer); drop(stream); let requests = test_consumer.requests.lock().unwrap();
209
210 assert_eq!(requests.len(), 5);
211 let series = requests.first().unwrap().series.first().unwrap();
212 if let Some(PointsType::DoublePoints(points)) =
213 series.points.as_ref().unwrap().points_type.as_ref()
214 {
215 assert_eq!(points.points.len(), 1000);
216 } else {
217 panic!("unexpected data type");
218 }
219 }
220
221 #[test_log::test]
222 fn test_time_flush() {
223 let (test_consumer, stream) = create_test_stream();
224
225 let cd = ChannelDescriptor::new("channel_1");
226 let mut writer = stream.double_writer(&cd);
227
228 writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
229 thread::sleep(Duration::from_millis(101));
230 writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); thread::sleep(Duration::from_millis(101));
232 writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); drop(writer);
235 drop(stream);
236
237 let requests = test_consumer.requests.lock().unwrap();
238 dbg!(&requests);
239 assert_eq!(requests.len(), 2);
240 }
241
242 #[test_log::test]
243 fn test_writer_types() {
244 let (test_consumer, stream) = create_test_stream();
245
246 let cd1 = ChannelDescriptor::new("double");
247 let cd2 = ChannelDescriptor::new("string");
248 let cd3 = ChannelDescriptor::new("int");
249 let mut double_writer = stream.double_writer(&cd1);
250 let mut string_writer = stream.string_writer(&cd2);
251 let mut integer_writer = stream.integer_writer(&cd3);
252
253 for i in 0..5000 {
254 let start_time = UNIX_EPOCH.elapsed().unwrap();
255 let value = i % 50;
256 double_writer.push(start_time, value as f64);
257 string_writer.push(start_time, format!("{}", value));
258 integer_writer.push(start_time, value);
259 }
260
261 drop(double_writer);
262 drop(string_writer);
263 drop(integer_writer);
264 drop(stream);
265
266 let requests = test_consumer.requests.lock().unwrap();
267
268 assert_eq!(requests.len(), 15);
269
270 let r = requests
271 .iter()
272 .flat_map(|r| r.series.clone())
273 .map(|s| {
274 (
275 s.channel.unwrap().name,
276 s.points.unwrap().points_type.unwrap(),
277 )
278 })
279 .collect::<HashMap<_, _>>();
280
281 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
282 panic!("invalid double points type");
283 };
284
285 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
286 panic!("invalid int points type");
287 };
288
289 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
290 panic!("invalid string points type");
291 };
292
293 assert_eq!(dp.points.len(), 1000);
295 assert_eq!(sp.points.len(), 1000);
296 assert_eq!(ip.points.len(), 1000);
297 }
298}