1pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162mod types;
163pub mod upload;
164
165pub use nominal_api as api;
166
167pub mod prelude {
169 pub use conjure_object::BearerToken;
170 pub use conjure_object::ResourceIdentifier;
171 pub use nominal_api::tonic::google::protobuf::Timestamp;
172 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
173 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
174 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
175 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
176 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
177 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
178 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
179 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
180 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
181
182 pub use crate::consumer::NominalCoreConsumer;
183 pub use crate::stream::NominalDatasetStream;
184 #[expect(deprecated)]
185 pub use crate::stream::NominalDatasourceStream;
186 pub use crate::stream::NominalStreamOpts;
187 pub use crate::types::AuthProvider;
188 pub use crate::types::ChannelDescriptor;
189}
190
191#[cfg(test)]
192mod tests {
193 use std::collections::HashMap;
194 use std::sync::Arc;
195 use std::sync::Mutex;
196 use std::thread;
197 use std::time::Duration;
198 use std::time::UNIX_EPOCH;
199
200 use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
201
202 use crate::client::PRODUCTION_API_URL;
203 use crate::consumer::ConsumerResult;
204 use crate::consumer::WriteRequestConsumer;
205 use crate::prelude::*;
206 use crate::types::IntoTimestamp;
207
208 #[derive(Debug)]
209 struct TestDatasourceStream {
210 requests: Mutex<Vec<WriteRequestNominal>>,
211 }
212
213 impl WriteRequestConsumer for Arc<TestDatasourceStream> {
214 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
215 self.requests.lock().unwrap().push(request.clone());
216 Ok(())
217 }
218 }
219
220 fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
221 let test_consumer = Arc::new(TestDatasourceStream {
222 requests: Mutex::new(vec![]),
223 });
224 let stream = NominalDatasetStream::new_with_consumer(
225 test_consumer.clone(),
226 NominalStreamOpts {
227 max_points_per_record: 1000,
228 max_request_delay: Duration::from_millis(100),
229 max_buffered_requests: 2,
230 request_dispatcher_tasks: 4,
231 base_api_url: PRODUCTION_API_URL.to_string(),
232 },
233 );
234
235 (test_consumer, stream)
236 }
237
238 #[test_log::test]
239 fn test_stream() {
240 let (test_consumer, stream) = create_test_stream();
241
242 for batch in 0..5 {
243 let mut points = Vec::new();
244 for i in 0..1000 {
245 let start_time = UNIX_EPOCH.elapsed().unwrap();
246 points.push(DoublePoint {
247 timestamp: Some(Timestamp {
248 seconds: start_time.as_secs() as i64,
249 nanos: start_time.subsec_nanos() as i32 + i,
250 }),
251 value: (i % 50) as f64,
252 });
253 }
254
255 stream.enqueue(
256 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
257 points,
258 );
259 }
260
261 drop(stream); let requests = test_consumer.requests.lock().unwrap();
264
265 assert_eq!(requests.len(), 5);
268 let series = requests.first().unwrap().series.first().unwrap();
269 if let Some(PointsType::DoublePoints(points)) =
270 series.points.as_ref().unwrap().points_type.as_ref()
271 {
272 assert_eq!(points.points.len(), 1000);
273 } else {
274 panic!("unexpected data type");
275 }
276 }
277
278 #[test_log::test]
279 fn test_stream_types() {
280 let (test_consumer, stream) = create_test_stream();
281
282 for batch in 0..5 {
283 let mut doubles = Vec::new();
284 let mut strings = Vec::new();
285 let mut ints = Vec::new();
286 for i in 0..1000 {
287 let start_time = UNIX_EPOCH.elapsed().unwrap();
288 doubles.push(DoublePoint {
289 timestamp: Some(start_time.into_timestamp()),
290 value: (i % 50) as f64,
291 });
292 strings.push(StringPoint {
293 timestamp: Some(start_time.into_timestamp()),
294 value: format!("{}", i % 50),
295 });
296 ints.push(IntegerPoint {
297 timestamp: Some(start_time.into_timestamp()),
298 value: i % 50,
299 })
300 }
301
302 stream.enqueue(
303 &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
304 doubles,
305 );
306 stream.enqueue(
307 &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
308 strings,
309 );
310 stream.enqueue(
311 &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
312 ints,
313 );
314 }
315
316 drop(stream); let requests = test_consumer.requests.lock().unwrap();
319
320 assert_eq!(requests.len(), 15);
323
324 let r = requests
325 .iter()
326 .flat_map(|r| r.series.clone())
327 .map(|s| {
328 (
329 s.channel.unwrap().name,
330 s.points.unwrap().points_type.unwrap(),
331 )
332 })
333 .collect::<HashMap<_, _>>();
334 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
335 panic!("invalid double points type");
336 };
337
338 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
339 panic!("invalid int points type");
340 };
341
342 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
343 panic!("invalid string points type");
344 };
345
346 assert_eq!(dp.points.len(), 1000);
348 assert_eq!(sp.points.len(), 1000);
349 assert_eq!(ip.points.len(), 1000);
350 }
351
352 #[test_log::test]
353 fn test_writer() {
354 let (test_consumer, stream) = create_test_stream();
355
356 let cd = ChannelDescriptor::new("channel_1");
357 let mut writer = stream.double_writer(&cd);
358
359 for i in 0..5000 {
360 let start_time = UNIX_EPOCH.elapsed().unwrap();
361 let value = i % 50;
362 writer.push(start_time, value as f64);
363 }
364
365 drop(writer); drop(stream); let requests = test_consumer.requests.lock().unwrap();
369
370 assert_eq!(requests.len(), 5);
371 let series = requests.first().unwrap().series.first().unwrap();
372 if let Some(PointsType::DoublePoints(points)) =
373 series.points.as_ref().unwrap().points_type.as_ref()
374 {
375 assert_eq!(points.points.len(), 1000);
376 } else {
377 panic!("unexpected data type");
378 }
379 }
380
381 #[test_log::test]
382 fn test_time_flush() {
383 let (test_consumer, stream) = create_test_stream();
384
385 let cd = ChannelDescriptor::new("channel_1");
386 let mut writer = stream.double_writer(&cd);
387
388 writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
389 thread::sleep(Duration::from_millis(101));
390 writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); thread::sleep(Duration::from_millis(101));
392 writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); drop(writer);
395 drop(stream);
396
397 let requests = test_consumer.requests.lock().unwrap();
398 dbg!(&requests);
399 assert_eq!(requests.len(), 2);
400 }
401
402 #[test_log::test]
403 fn test_writer_types() {
404 let (test_consumer, stream) = create_test_stream();
405
406 let cd1 = ChannelDescriptor::new("double");
407 let cd2 = ChannelDescriptor::new("string");
408 let cd3 = ChannelDescriptor::new("int");
409 let mut double_writer = stream.double_writer(&cd1);
410 let mut string_writer = stream.string_writer(&cd2);
411 let mut integer_writer = stream.integer_writer(&cd3);
412
413 for i in 0..5000 {
414 let start_time = UNIX_EPOCH.elapsed().unwrap();
415 let value = i % 50;
416 double_writer.push(start_time, value as f64);
417 string_writer.push(start_time, format!("{}", value));
418 integer_writer.push(start_time, value);
419 }
420
421 drop(double_writer);
422 drop(string_writer);
423 drop(integer_writer);
424 drop(stream);
425
426 let requests = test_consumer.requests.lock().unwrap();
427
428 assert_eq!(requests.len(), 15);
429
430 let r = requests
431 .iter()
432 .flat_map(|r| r.series.clone())
433 .map(|s| {
434 (
435 s.channel.unwrap().name,
436 s.points.unwrap().points_type.unwrap(),
437 )
438 })
439 .collect::<HashMap<_, _>>();
440
441 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
442 panic!("invalid double points type");
443 };
444
445 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
446 panic!("invalid int points type");
447 };
448
449 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
450 panic!("invalid string points type");
451 };
452
453 assert_eq!(dp.points.len(), 1000);
455 assert_eq!(sp.points.len(), 1000);
456 assert_eq!(ip.points.len(), 1000);
457 }
458}