1pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162pub mod types;
163pub mod upload;
164
165pub use nominal_api as api;
166
167pub mod prelude {
169 pub use conjure_object::BearerToken;
170 pub use conjure_object::ResourceIdentifier;
171 pub use nominal_api::tonic::google::protobuf::Timestamp;
172 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
173 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
174 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
175 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
176 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
177 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
178 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
179 pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
180 pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
181 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
182 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
183
184 pub use crate::consumer::NominalCoreConsumer;
185 pub use crate::stream::NominalDatasetStream;
186 #[expect(deprecated)]
187 pub use crate::stream::NominalDatasourceStream;
188 pub use crate::stream::NominalStreamOpts;
189 pub use crate::types::AuthProvider;
190 pub use crate::types::ChannelDescriptor;
191 pub use crate::types::IntoTimestamp;
192}
193
194#[cfg(test)]
195mod tests {
196 use std::collections::HashMap;
197 use std::sync::Arc;
198 use std::sync::Mutex;
199 use std::thread;
200 use std::time::Duration;
201 use std::time::UNIX_EPOCH;
202
203 use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
204
205 use crate::client::PRODUCTION_API_URL;
206 use crate::consumer::ConsumerResult;
207 use crate::consumer::WriteRequestConsumer;
208 use crate::prelude::*;
209
210 #[derive(Debug)]
211 struct TestDatasourceStream {
212 requests: Mutex<Vec<WriteRequestNominal>>,
213 }
214
215 impl WriteRequestConsumer for Arc<TestDatasourceStream> {
216 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
217 self.requests.lock().unwrap().push(request.clone());
218 Ok(())
219 }
220 }
221
222 fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
223 let test_consumer = Arc::new(TestDatasourceStream {
224 requests: Mutex::new(vec![]),
225 });
226 let stream = NominalDatasetStream::new_with_consumer(
227 test_consumer.clone(),
228 NominalStreamOpts {
229 max_points_per_record: 1000,
230 max_request_delay: Duration::from_millis(100),
231 max_buffered_requests: 2,
232 request_dispatcher_tasks: 4,
233 base_api_url: PRODUCTION_API_URL.to_string(),
234 },
235 );
236
237 (test_consumer, stream)
238 }
239
240 #[test_log::test]
241 fn test_stream() {
242 let (test_consumer, stream) = create_test_stream();
243
244 for batch in 0..5 {
245 let mut points = Vec::new();
246 for i in 0..1000 {
247 let start_time = UNIX_EPOCH.elapsed().unwrap();
248 points.push(DoublePoint {
249 timestamp: Some(Timestamp {
250 seconds: start_time.as_secs() as i64,
251 nanos: start_time.subsec_nanos() as i32 + i,
252 }),
253 value: (i % 50) as f64,
254 });
255 }
256
257 stream.enqueue(
258 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
259 points,
260 );
261 }
262
263 drop(stream); let requests = test_consumer.requests.lock().unwrap();
266
267 assert_eq!(requests.len(), 5);
270 let series = requests.first().unwrap().series.first().unwrap();
271 if let Some(PointsType::DoublePoints(points)) =
272 series.points.as_ref().unwrap().points_type.as_ref()
273 {
274 assert_eq!(points.points.len(), 1000);
275 } else {
276 panic!("unexpected data type");
277 }
278 }
279
280 #[test_log::test]
281 fn test_stream_types() {
282 let (test_consumer, stream) = create_test_stream();
283
284 for batch in 0..5 {
285 let mut doubles = Vec::new();
286 let mut strings = Vec::new();
287 let mut ints = Vec::new();
288 let mut uints = Vec::new();
289 for i in 0..1000 {
290 let start_time = UNIX_EPOCH.elapsed().unwrap();
291 doubles.push(DoublePoint {
292 timestamp: Some(start_time.into_timestamp()),
293 value: (i % 50) as f64,
294 });
295 strings.push(StringPoint {
296 timestamp: Some(start_time.into_timestamp()),
297 value: format!("{}", i % 50),
298 });
299 ints.push(IntegerPoint {
300 timestamp: Some(start_time.into_timestamp()),
301 value: i % 50,
302 });
303 uints.push(Uint64Point {
304 timestamp: Some(start_time.into_timestamp()),
305 value: (i % 50) as u64,
306 })
307 }
308
309 stream.enqueue(
310 &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
311 doubles,
312 );
313 stream.enqueue(
314 &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
315 strings,
316 );
317 stream.enqueue(
318 &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
319 ints,
320 );
321 stream.enqueue(
322 &ChannelDescriptor::with_tags("uint64", [("batch_id", batch.to_string())]),
323 uints,
324 );
325 }
326
327 drop(stream); let requests = test_consumer.requests.lock().unwrap();
330
331 assert_eq!(requests.len(), 20);
334
335 let r = requests
336 .iter()
337 .flat_map(|r| r.series.clone())
338 .map(|s| {
339 (
340 s.channel.unwrap().name,
341 s.points.unwrap().points_type.unwrap(),
342 )
343 })
344 .collect::<HashMap<_, _>>();
345 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
346 panic!("invalid double points type");
347 };
348
349 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
350 panic!("invalid int points type");
351 };
352
353 let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
354 panic!("invalid uint64 points type");
355 };
356
357 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
358 panic!("invalid string points type");
359 };
360
361 assert_eq!(dp.points.len(), 1000);
363 assert_eq!(sp.points.len(), 1000);
364 assert_eq!(ip.points.len(), 1000);
365 assert_eq!(up.points.len(), 1000);
366 }
367
368 #[test_log::test]
369 fn test_writer() {
370 let (test_consumer, stream) = create_test_stream();
371
372 let cd = ChannelDescriptor::new("channel_1");
373 let mut writer = stream.double_writer(&cd);
374
375 for i in 0..5000 {
376 let start_time = UNIX_EPOCH.elapsed().unwrap();
377 let value = i % 50;
378 writer.push(start_time, value as f64);
379 }
380
381 drop(writer); drop(stream); let requests = test_consumer.requests.lock().unwrap();
385
386 assert_eq!(requests.len(), 5);
387 let series = requests.first().unwrap().series.first().unwrap();
388 if let Some(PointsType::DoublePoints(points)) =
389 series.points.as_ref().unwrap().points_type.as_ref()
390 {
391 assert_eq!(points.points.len(), 1000);
392 } else {
393 panic!("unexpected data type");
394 }
395 }
396
397 #[test_log::test]
398 fn test_time_flush() {
399 let (test_consumer, stream) = create_test_stream();
400
401 let cd = ChannelDescriptor::new("channel_1");
402 let mut writer = stream.double_writer(&cd);
403
404 writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
405 thread::sleep(Duration::from_millis(101));
406 writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); thread::sleep(Duration::from_millis(101));
408 writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); drop(writer);
411 drop(stream);
412
413 let requests = test_consumer.requests.lock().unwrap();
414 dbg!(&requests);
415 assert_eq!(requests.len(), 2);
416 }
417
418 #[test_log::test]
419 fn test_writer_types() {
420 let (test_consumer, stream) = create_test_stream();
421
422 let cd1 = ChannelDescriptor::new("double");
423 let cd2 = ChannelDescriptor::new("string");
424 let cd3 = ChannelDescriptor::new("int");
425 let cd4 = ChannelDescriptor::new("uint64");
426 let mut double_writer = stream.double_writer(&cd1);
427 let mut string_writer = stream.string_writer(&cd2);
428 let mut integer_writer = stream.integer_writer(&cd3);
429 let mut uint64_writer = stream.uint64_writer(&cd4);
430
431 for i in 0..5000 {
432 let start_time = UNIX_EPOCH.elapsed().unwrap();
433 let value = i % 50;
434 double_writer.push(start_time, value as f64);
435 string_writer.push(start_time, format!("{}", value));
436 integer_writer.push(start_time, value);
437 uint64_writer.push(start_time, value as u64);
438 }
439
440 drop(double_writer);
441 drop(string_writer);
442 drop(integer_writer);
443 drop(uint64_writer);
444 drop(stream);
445
446 let requests = test_consumer.requests.lock().unwrap();
447
448 assert_eq!(requests.len(), 20);
449
450 let r = requests
451 .iter()
452 .flat_map(|r| r.series.clone())
453 .map(|s| {
454 (
455 s.channel.unwrap().name,
456 s.points.unwrap().points_type.unwrap(),
457 )
458 })
459 .collect::<HashMap<_, _>>();
460
461 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
462 panic!("invalid double points type");
463 };
464
465 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
466 panic!("invalid int points type");
467 };
468
469 let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
470 panic!("invalid uint64 points type");
471 };
472
473 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
474 panic!("invalid string points type");
475 };
476
477 assert_eq!(dp.points.len(), 1000);
479 assert_eq!(sp.points.len(), 1000);
480 assert_eq!(ip.points.len(), 1000);
481 assert_eq!(up.points.len(), 1000);
482 }
483}