1pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162mod types;
163pub mod upload;
164
165pub mod prelude {
167 pub use conjure_object::BearerToken;
168 pub use conjure_object::ResourceIdentifier;
169 pub use nominal_api::tonic::google::protobuf::Timestamp;
170 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
171 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
172 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
173 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
174 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
175 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
176 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
177 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
178 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
179
180 pub use crate::consumer::NominalCoreConsumer;
181 pub use crate::stream::NominalDatasetStream;
182 #[expect(deprecated)]
183 pub use crate::stream::NominalDatasourceStream;
184 pub use crate::stream::NominalStreamOpts;
185 pub use crate::types::AuthProvider;
186 pub use crate::types::ChannelDescriptor;
187}
188
189#[cfg(test)]
190mod tests {
191 use std::collections::HashMap;
192 use std::sync::Arc;
193 use std::sync::Mutex;
194 use std::thread;
195 use std::time::Duration;
196 use std::time::UNIX_EPOCH;
197
198 use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
199
200 use crate::client::PRODUCTION_API_URL;
201 use crate::consumer::ConsumerResult;
202 use crate::consumer::WriteRequestConsumer;
203 use crate::prelude::*;
204 use crate::types::IntoTimestamp;
205
206 #[derive(Debug)]
207 struct TestDatasourceStream {
208 requests: Mutex<Vec<WriteRequestNominal>>,
209 }
210
211 impl WriteRequestConsumer for Arc<TestDatasourceStream> {
212 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
213 self.requests.lock().unwrap().push(request.clone());
214 Ok(())
215 }
216 }
217
218 fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
219 let test_consumer = Arc::new(TestDatasourceStream {
220 requests: Mutex::new(vec![]),
221 });
222 let stream = NominalDatasetStream::new_with_consumer(
223 test_consumer.clone(),
224 NominalStreamOpts {
225 max_points_per_record: 1000,
226 max_request_delay: Duration::from_millis(100),
227 max_buffered_requests: 2,
228 request_dispatcher_tasks: 4,
229 base_api_url: PRODUCTION_API_URL.to_string(),
230 },
231 );
232
233 (test_consumer, stream)
234 }
235
236 #[test]
237 fn test_stream() {
238 let (test_consumer, stream) = create_test_stream();
239
240 for batch in 0..5 {
241 let mut points = Vec::new();
242 for i in 0..1000 {
243 let start_time = UNIX_EPOCH.elapsed().unwrap();
244 points.push(DoublePoint {
245 timestamp: Some(Timestamp {
246 seconds: start_time.as_secs() as i64,
247 nanos: start_time.subsec_nanos() as i32 + i,
248 }),
249 value: (i % 50) as f64,
250 });
251 }
252
253 stream.enqueue(
254 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
255 points,
256 );
257 }
258
259 drop(stream); let requests = test_consumer.requests.lock().unwrap();
262
263 assert_eq!(requests.len(), 5);
266 let series = requests.first().unwrap().series.first().unwrap();
267 if let Some(PointsType::DoublePoints(points)) =
268 series.points.as_ref().unwrap().points_type.as_ref()
269 {
270 assert_eq!(points.points.len(), 1000);
271 } else {
272 panic!("unexpected data type");
273 }
274 }
275
276 #[test]
277 fn test_stream_types() {
278 let (test_consumer, stream) = create_test_stream();
279
280 for batch in 0..5 {
281 let mut doubles = Vec::new();
282 let mut strings = Vec::new();
283 let mut ints = Vec::new();
284 for i in 0..1000 {
285 let start_time = UNIX_EPOCH.elapsed().unwrap();
286 doubles.push(DoublePoint {
287 timestamp: Some(start_time.into_timestamp()),
288 value: (i % 50) as f64,
289 });
290 strings.push(StringPoint {
291 timestamp: Some(start_time.into_timestamp()),
292 value: format!("{}", i % 50),
293 });
294 ints.push(IntegerPoint {
295 timestamp: Some(start_time.into_timestamp()),
296 value: i % 50,
297 })
298 }
299
300 stream.enqueue(
301 &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
302 doubles,
303 );
304 stream.enqueue(
305 &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
306 strings,
307 );
308 stream.enqueue(
309 &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
310 ints,
311 );
312 }
313
314 drop(stream); let requests = test_consumer.requests.lock().unwrap();
317
318 assert_eq!(requests.len(), 15);
321
322 let r = requests
323 .iter()
324 .flat_map(|r| r.series.clone())
325 .map(|s| {
326 (
327 s.channel.unwrap().name,
328 s.points.unwrap().points_type.unwrap(),
329 )
330 })
331 .collect::<HashMap<_, _>>();
332 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
333 panic!("invalid double points type");
334 };
335
336 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
337 panic!("invalid int points type");
338 };
339
340 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
341 panic!("invalid string points type");
342 };
343
344 assert_eq!(dp.points.len(), 1000);
346 assert_eq!(sp.points.len(), 1000);
347 assert_eq!(ip.points.len(), 1000);
348 }
349
350 #[test_log::test]
351 fn test_writer() {
352 let (test_consumer, stream) = create_test_stream();
353
354 let cd = ChannelDescriptor::new("channel_1");
355 let mut writer = stream.double_writer(&cd);
356
357 for i in 0..5000 {
358 let start_time = UNIX_EPOCH.elapsed().unwrap();
359 let value = i % 50;
360 writer.push(start_time, value as f64);
361 }
362
363 drop(writer); drop(stream); let requests = test_consumer.requests.lock().unwrap();
367
368 assert_eq!(requests.len(), 5);
369 let series = requests.first().unwrap().series.first().unwrap();
370 if let Some(PointsType::DoublePoints(points)) =
371 series.points.as_ref().unwrap().points_type.as_ref()
372 {
373 assert_eq!(points.points.len(), 1000);
374 } else {
375 panic!("unexpected data type");
376 }
377 }
378
379 #[test_log::test]
380 fn test_time_flush() {
381 let (test_consumer, stream) = create_test_stream();
382
383 let cd = ChannelDescriptor::new("channel_1");
384 let mut writer = stream.double_writer(&cd);
385
386 writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
387 thread::sleep(Duration::from_millis(101));
388 writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); thread::sleep(Duration::from_millis(101));
390 writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); drop(writer);
393 drop(stream);
394
395 let requests = test_consumer.requests.lock().unwrap();
396 dbg!(&requests);
397 assert_eq!(requests.len(), 2);
398 }
399
400 #[test_log::test]
401 fn test_writer_types() {
402 let (test_consumer, stream) = create_test_stream();
403
404 let cd1 = ChannelDescriptor::new("double");
405 let cd2 = ChannelDescriptor::new("string");
406 let cd3 = ChannelDescriptor::new("int");
407 let mut double_writer = stream.double_writer(&cd1);
408 let mut string_writer = stream.string_writer(&cd2);
409 let mut integer_writer = stream.integer_writer(&cd3);
410
411 for i in 0..5000 {
412 let start_time = UNIX_EPOCH.elapsed().unwrap();
413 let value = i % 50;
414 double_writer.push(start_time, value as f64);
415 string_writer.push(start_time, format!("{}", value));
416 integer_writer.push(start_time, value);
417 }
418
419 drop(double_writer);
420 drop(string_writer);
421 drop(integer_writer);
422 drop(stream);
423
424 let requests = test_consumer.requests.lock().unwrap();
425
426 assert_eq!(requests.len(), 15);
427
428 let r = requests
429 .iter()
430 .flat_map(|r| r.series.clone())
431 .map(|s| {
432 (
433 s.channel.unwrap().name,
434 s.points.unwrap().points_type.unwrap(),
435 )
436 })
437 .collect::<HashMap<_, _>>();
438
439 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
440 panic!("invalid double points type");
441 };
442
443 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
444 panic!("invalid int points type");
445 };
446
447 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
448 panic!("invalid string points type");
449 };
450
451 assert_eq!(dp.points.len(), 1000);
453 assert_eq!(sp.points.len(), 1000);
454 assert_eq!(ip.points.len(), 1000);
455 }
456}