Skip to main content

nominal_streaming/
lib.rs

1/*!
2`nominal-streaming` is a crate for streaming data into [Nominal Core](https://nominal.io/products/core).
3
4The library aims to balance three concerns:
5
61. Data should exist in-memory only for a limited, configurable amount of time before it's sent to Core.
71. Writes should fall back to disk if there are network failures.
81. Backpressure should be applied to incoming requests when network throughput is saturated.
9
10This library streams data to Nominal Core, to a file, or to Nominal Core with a file as backup (recommended to protect against network failures).
11It also provides configuration to manage the tradeoff between above listed concerns.
12
13<div class="warning">
14This library is still under active development and may make breaking changes.
15</div>
16
17## Conceptual overview
18
19Data is sent to a [Stream](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalDatasetStream.html) via a Writer.
20For example:
21
22- A file stream is constructed as:
23
24  ```rust,no_run
25  use nominal_streaming::stream::NominalDatasetStreamBuilder;
26
27  let stream = NominalDatasetStreamBuilder::new()
28      .stream_to_file("my_data.avro")
29      .build();
30  ```
31
32- A stream that sends data to Nominal Core, but writes failed requests to a file, is created as follows:
33
34  ```rust,ignore
35  let stream = NominalDatasetStreamBuilder::new()
36      .stream_to_core(token, dataset_rid, handle)
37      .with_file_fallback("fallback.avro")
38      .build();
39  ```
40
41- Or, you can build a stream that sends data to Nominal Core *and* to a file:
42
43  ```rust,ignore
44  let stream = NominalDatasetStreamBuilder::new()
45      .stream_to_core(token, dataset_rid, handle)
46      .stream_to_file("my_data.avro")
47      .build();
48  ```
49
50(See below for a [full example](#example-streaming-from-memory-to-nominal-core-with-file-fallback), that also shows how to create the `token`, `dataset_rid`, and `handle` values above.)
51
52Once we have a Stream, we can construct a Writer and send values to it:
53
54```rust,ignore
55let channel_descriptor = ChannelDescriptor::with_tags(
56    "channel_1", [("experiment_id", "123")]
57);
58
59let mut writer = stream.double_writer(channel_descriptor);
60
61// Stream single data point
62let start_time = UNIX_EPOCH.elapsed().unwrap();
63let value: f64 = 123;
64writer.push(start_time, value);
65```
66
67Here, we are enqueuing data onto Channel 1, with tags "name" and "batch".
68These are, of course, just examples, and you can choose your own.
69
70## Example: streaming from memory to Nominal Core, with file fallback
71
72This is the typical scenario where we want to stream some values from memory into a [Nominal Dataset](https://docs.nominal.io/core/sdk/python-client/streaming/overview#streaming-data-to-a-dataset).
73If the upload fails (say because of network errors), we'd like to instead send the data to an Avro file. Note that the Avro spec does not support uint64 values, so those will be stored as signed int64 values.
74
75Note that we set up the async [Tokio runtime](https://tokio.rs/), since that is required by the underlying [`NominalCoreConsumer`](https://docs.rs/nominal-streaming/latest/nominal_streaming/consumer/struct.NominalCoreConsumer.html).
76
77```rust,no_run
78use nominal_streaming::prelude::*;
79use nominal_streaming::stream::NominalDatasetStreamBuilder;
80
81use std::time::UNIX_EPOCH;
82
83
84static DATASET_RID: &str = "ri.catalog....";  // your dataset ID here
85
86
87fn main() {
88    // The NominalCoreConsumer requires a tokio runtime
89    tokio::runtime::Builder::new_multi_thread()
90        .enable_all()
91        .worker_threads(4)
92        .thread_name("tokio")
93        .build()
94        .expect("Failed to create Tokio runtime")
95        .block_on(async_main());
96}
97
98
99async fn async_main() {
100    // Configure token for authentication
101    let token = BearerToken::new(
102        std::env::var("NOMINAL_TOKEN")
103            .expect("NOMINAL_TOKEN environment variable not set")
104            .as_str(),
105    )
106    .expect("Invalid token");
107
108    let dataset_rid = ResourceIdentifier::new(DATASET_RID).unwrap();
109    let handle = tokio::runtime::Handle::current();
110
111    let stream = NominalDatasetStreamBuilder::new()
112        .stream_to_core(token, dataset_rid, handle)
113        .with_file_fallback("fallback.avro")
114        .build();
115
116    let channel_descriptor = ChannelDescriptor::with_tags("channel_1", [("experiment_id", "123")]);
117
118    let mut writer = stream.double_writer(channel_descriptor);
119
120    // Generate and upload 100,000 data points
121    for i in 0..100_000 {
122        let start_time = UNIX_EPOCH.elapsed().unwrap();
123        let value = i % 50;
124        writer.push(start_time, value as f64);
125    }
126}
127```
128
129## Additional configuration
130
131### Stream options
132
133Above, you saw an example using [`NominalStreamOpts::default`](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalStreamOpts.html).
134The following stream options can be set using `.with_options(...)` on the StreamBuilder:
135
136```text
137NominalStreamOpts {
138  max_points_per_record: usize,
139  max_request_delay: Duration,
140  max_buffered_requests: usize,
141  request_dispatcher_tasks: usize,
142}
143```
144
145### Logging errors
146
147Most of the time, when things go wrong, we want some form of reporting. You can enable debug logging on the StreamBuilder by using `.enable_logging()`:
148
149```rust,ignore
150let stream = NominalDatasetStreamBuilder::new()
151    .stream_to_core(token, dataset_rid, handle)
152    .with_file_fallback("fallback.avro")
153    .enable_logging()
154    .build();
155```
156*/
157
158pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162pub mod types;
163pub mod upload;
164
165pub use nominal_api as api;
166
167/// This includes the most common types in this crate, re-exported for your convenience.
168pub mod prelude {
169    pub use conjure_object::BearerToken;
170    pub use conjure_object::ResourceIdentifier;
171    pub use nominal_api::tonic::google::protobuf::Timestamp;
172    pub use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
173    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
174    pub use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
175    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
176    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoints;
177    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
178    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
179    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
180    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
181    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
182    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoints;
183    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
184    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
185    pub use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
186    pub use nominal_api::tonic::io::nominal::scout::api::proto::StructPoints;
187    pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
188    pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
189    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
190    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
191
192    pub use crate::consumer::NominalCoreConsumer;
193    pub use crate::stream::NominalDatasetStream;
194    #[expect(deprecated)]
195    pub use crate::stream::NominalDatasourceStream;
196    pub use crate::stream::NominalStreamOpts;
197    pub use crate::types::AuthProvider;
198    pub use crate::types::ChannelDescriptor;
199    pub use crate::types::IntoTimestamp;
200}
201
202#[cfg(test)]
203mod tests {
204    use std::collections::HashMap;
205    use std::sync::Arc;
206    use std::sync::Mutex;
207    use std::thread;
208    use std::time::Duration;
209    use std::time::UNIX_EPOCH;
210
211    use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
212    use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
213    use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
214
215    use crate::client::PRODUCTION_API_URL;
216    use crate::consumer::ConsumerResult;
217    use crate::consumer::WriteRequestConsumer;
218    use crate::prelude::*;
219
220    #[derive(Debug)]
221    struct TestDatasourceStream {
222        requests: Mutex<Vec<WriteRequestNominal>>,
223    }
224
225    impl WriteRequestConsumer for Arc<TestDatasourceStream> {
226        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
227            self.requests.lock().unwrap().push(request.clone());
228            Ok(())
229        }
230    }
231
232    fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
233        let test_consumer = Arc::new(TestDatasourceStream {
234            requests: Mutex::new(vec![]),
235        });
236        let stream = NominalDatasetStream::new_with_consumer(
237            test_consumer.clone(),
238            NominalStreamOpts {
239                max_points_per_record: 1000,
240                max_request_delay: Duration::from_millis(100),
241                max_buffered_requests: 2,
242                request_dispatcher_tasks: 4,
243                base_api_url: PRODUCTION_API_URL.to_string(),
244            },
245        );
246
247        (test_consumer, stream)
248    }
249
250    #[test_log::test]
251    fn test_stream() {
252        let (test_consumer, stream) = create_test_stream();
253
254        for batch in 0..5 {
255            let mut points = Vec::new();
256            for i in 0..1000 {
257                let start_time = UNIX_EPOCH.elapsed().unwrap();
258                points.push(DoublePoint {
259                    timestamp: Some(Timestamp {
260                        seconds: start_time.as_secs() as i64,
261                        nanos: start_time.subsec_nanos() as i32 + i,
262                    }),
263                    value: (i % 50) as f64,
264                });
265            }
266
267            stream.enqueue(
268                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
269                points,
270            );
271        }
272
273        drop(stream); // wait for points to flush
274
275        let requests = test_consumer.requests.lock().unwrap();
276
277        // validate that the requests were flushed based on the max_records value, not the
278        // max request delay
279        assert_eq!(requests.len(), 5);
280        let series = requests.first().unwrap().series.first().unwrap();
281        if let Some(PointsType::DoublePoints(points)) =
282            series.points.as_ref().unwrap().points_type.as_ref()
283        {
284            assert_eq!(points.points.len(), 1000);
285        } else {
286            panic!("unexpected data type");
287        }
288    }
289
290    #[test_log::test]
291    fn test_stream_types() {
292        let (test_consumer, stream) = create_test_stream();
293
294        for batch in 0..5 {
295            let mut doubles = Vec::new();
296            let mut strings = Vec::new();
297            let mut structs = Vec::new();
298            let mut ints = Vec::new();
299            let mut uints = Vec::new();
300            let mut double_arrays = Vec::new();
301            let mut string_arrays = Vec::new();
302            for i in 0..1000 {
303                let start_time = UNIX_EPOCH.elapsed().unwrap();
304                doubles.push(DoublePoint {
305                    timestamp: Some(start_time.into_timestamp()),
306                    value: (i % 50) as f64,
307                });
308                strings.push(StringPoint {
309                    timestamp: Some(start_time.into_timestamp()),
310                    value: format!("{}", i % 50),
311                });
312                structs.push(StructPoint {
313                    timestamp: Some(start_time.into_timestamp()),
314                    json_string: format!("{{\"v\":{}}}", i % 50),
315                });
316                ints.push(IntegerPoint {
317                    timestamp: Some(start_time.into_timestamp()),
318                    value: i % 50,
319                });
320                uints.push(Uint64Point {
321                    timestamp: Some(start_time.into_timestamp()),
322                    value: (i % 50) as u64,
323                });
324                double_arrays.push(DoubleArrayPoint {
325                    timestamp: Some(start_time.into_timestamp()),
326                    value: vec![(i % 50) as f64, ((i + 1) % 50) as f64],
327                });
328                string_arrays.push(StringArrayPoint {
329                    timestamp: Some(start_time.into_timestamp()),
330                    value: vec![format!("{}", i % 50)],
331                });
332            }
333
334            stream.enqueue(
335                &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
336                doubles,
337            );
338            stream.enqueue(
339                &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
340                strings,
341            );
342            stream.enqueue(
343                &ChannelDescriptor::with_tags("struct", [("batch_id", batch.to_string())]),
344                structs,
345            );
346            stream.enqueue(
347                &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
348                ints,
349            );
350            stream.enqueue(
351                &ChannelDescriptor::with_tags("uint64", [("batch_id", batch.to_string())]),
352                uints,
353            );
354            stream.enqueue(
355                &ChannelDescriptor::with_tags("double_array", [("batch_id", batch.to_string())]),
356                double_arrays,
357            );
358            stream.enqueue(
359                &ChannelDescriptor::with_tags("string_array", [("batch_id", batch.to_string())]),
360                string_arrays,
361            );
362        }
363
364        drop(stream); // wait for points to flush
365
366        let requests = test_consumer.requests.lock().unwrap();
367
368        // validate that the requests were flushed based on the max_records value, not the
369        // max request delay
370        assert_eq!(requests.len(), 35);
371
372        let r = requests
373            .iter()
374            .flat_map(|r| r.series.clone())
375            .map(|s| {
376                (
377                    s.channel.unwrap().name,
378                    s.points.unwrap().points_type.unwrap(),
379                )
380            })
381            .collect::<HashMap<_, _>>();
382        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
383            panic!("invalid double points type");
384        };
385
386        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
387            panic!("invalid int points type");
388        };
389
390        let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
391            panic!("invalid uint64 points type");
392        };
393
394        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
395            panic!("invalid string points type");
396        };
397
398        let PointsType::StructPoints(stp) = r.get("struct").unwrap() else {
399            panic!("invalid struct points type");
400        };
401
402        let PointsType::ArrayPoints(ArrayPoints {
403            array_type: Some(ArrayType::DoubleArrayPoints(dap)),
404        }) = r.get("double_array").unwrap()
405        else {
406            panic!("invalid double array points type");
407        };
408
409        let PointsType::ArrayPoints(ArrayPoints {
410            array_type: Some(ArrayType::StringArrayPoints(sap)),
411        }) = r.get("string_array").unwrap()
412        else {
413            panic!("invalid string array points type");
414        };
415
416        // collect() overwrites into a single request per channel
417        assert_eq!(dp.points.len(), 1000);
418        assert_eq!(sp.points.len(), 1000);
419        assert_eq!(ip.points.len(), 1000);
420        assert_eq!(up.points.len(), 1000);
421        assert_eq!(stp.points.len(), 1000);
422        assert_eq!(dap.points.len(), 1000);
423        assert_eq!(sap.points.len(), 1000);
424    }
425
426    #[test_log::test]
427    #[should_panic(expected = "mismatched types")]
428    fn test_mismatched_array_types_panics() {
429        // Protects the exhaustive match in SeriesBufferGuard::extend from being
430        // silently simplified to a catch-all: pushing a DoubleArray and then a
431        // StringArray to the same channel must panic at buffer merge time.
432        //
433        // ManuallyDrop skips NominalDatasetStream::drop during unwind — the panic
434        // leaves unflushed_points non-zero, which would cause drop to spin forever.
435        let (_test_consumer, stream) = create_test_stream();
436        let stream = std::mem::ManuallyDrop::new(stream);
437        let cd = ChannelDescriptor::new("mixed_array");
438
439        let ts = UNIX_EPOCH.elapsed().unwrap().into_timestamp();
440
441        stream.enqueue(
442            &cd,
443            vec![DoubleArrayPoint {
444                timestamp: Some(ts),
445                value: vec![1.0, 2.0],
446            }],
447        );
448        stream.enqueue(
449            &cd,
450            vec![StringArrayPoint {
451                timestamp: Some(ts),
452                value: vec!["a".into()],
453            }],
454        );
455    }
456
457    #[test_log::test]
458    fn test_writer() {
459        let (test_consumer, stream) = create_test_stream();
460
461        let cd = ChannelDescriptor::new("channel_1");
462        let mut writer = stream.double_writer(cd);
463
464        for i in 0..5000 {
465            let start_time = UNIX_EPOCH.elapsed().unwrap();
466            let value = i % 50;
467            writer.push(start_time, value as f64);
468        }
469
470        drop(writer); // flush points to stream
471        drop(stream); // flush stream to nominal
472
473        let requests = test_consumer.requests.lock().unwrap();
474
475        assert_eq!(requests.len(), 5);
476        let series = requests.first().unwrap().series.first().unwrap();
477        if let Some(PointsType::DoublePoints(points)) =
478            series.points.as_ref().unwrap().points_type.as_ref()
479        {
480            assert_eq!(points.points.len(), 1000);
481        } else {
482            panic!("unexpected data type");
483        }
484    }
485
486    #[test_log::test]
487    fn test_time_flush() {
488        let (test_consumer, stream) = create_test_stream();
489
490        let cd = ChannelDescriptor::new("channel_1");
491        let mut writer = stream.double_writer(cd);
492
493        writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
494        thread::sleep(Duration::from_millis(101));
495        writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); // first flush
496        thread::sleep(Duration::from_millis(101));
497        writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); // second flush
498
499        drop(writer);
500        drop(stream);
501
502        let requests = test_consumer.requests.lock().unwrap();
503        dbg!(&requests);
504        assert_eq!(requests.len(), 2);
505    }
506
507    #[test_log::test]
508    fn test_writer_types() {
509        let (test_consumer, stream) = create_test_stream();
510
511        let cd1 = ChannelDescriptor::new("double");
512        let cd2 = ChannelDescriptor::new("string");
513        let cd3 = ChannelDescriptor::new("int");
514        let cd4 = ChannelDescriptor::new("uint64");
515        let cd5 = ChannelDescriptor::new("struct");
516        let cd6 = ChannelDescriptor::new("double_array");
517        let cd7 = ChannelDescriptor::new("string_array");
518        let mut double_writer = stream.double_writer(cd1);
519        let mut string_writer = stream.string_writer(cd2);
520        let mut integer_writer = stream.integer_writer(cd3);
521        let mut uint64_writer = stream.uint64_writer(cd4);
522        let mut struct_writer = stream.struct_writer(cd5);
523        let mut double_array_writer = stream.double_array_writer(cd6);
524        let mut string_array_writer = stream.string_array_writer(cd7);
525
526        for i in 0..5000 {
527            let start_time = UNIX_EPOCH.elapsed().unwrap();
528            let value = i % 50;
529            double_writer.push(start_time, value as f64);
530            string_writer.push(start_time, format!("{}", value));
531            integer_writer.push(start_time, value);
532            uint64_writer.push(start_time, value as u64);
533            struct_writer.push(start_time, format!("{{\"v\":{}}}", value));
534            double_array_writer.push(start_time, vec![value as f64, (value + 1) as f64]);
535            string_array_writer.push(start_time, vec![format!("{}", value)]);
536        }
537
538        drop(double_writer);
539        drop(string_writer);
540        drop(integer_writer);
541        drop(uint64_writer);
542        drop(struct_writer);
543        drop(double_array_writer);
544        drop(string_array_writer);
545        drop(stream);
546
547        let requests = test_consumer.requests.lock().unwrap();
548
549        assert_eq!(requests.len(), 35);
550
551        let r = requests
552            .iter()
553            .flat_map(|r| r.series.clone())
554            .map(|s| {
555                (
556                    s.channel.unwrap().name,
557                    s.points.unwrap().points_type.unwrap(),
558                )
559            })
560            .collect::<HashMap<_, _>>();
561
562        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
563            panic!("invalid double points type");
564        };
565
566        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
567            panic!("invalid int points type");
568        };
569
570        let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
571            panic!("invalid uint64 points type");
572        };
573
574        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
575            panic!("invalid string points type");
576        };
577
578        let PointsType::StructPoints(stp) = r.get("struct").unwrap() else {
579            panic!("invalid struct points type");
580        };
581
582        let PointsType::ArrayPoints(ArrayPoints {
583            array_type: Some(ArrayType::DoubleArrayPoints(dap)),
584        }) = r.get("double_array").unwrap()
585        else {
586            panic!("invalid double array points type");
587        };
588
589        let PointsType::ArrayPoints(ArrayPoints {
590            array_type: Some(ArrayType::StringArrayPoints(sap)),
591        }) = r.get("string_array").unwrap()
592        else {
593            panic!("invalid string array points type");
594        };
595
596        // collect() overwrites into a single request per channel
597        assert_eq!(dp.points.len(), 1000);
598        assert_eq!(sp.points.len(), 1000);
599        assert_eq!(ip.points.len(), 1000);
600        assert_eq!(up.points.len(), 1000);
601        assert_eq!(stp.points.len(), 1000);
602        assert_eq!(dap.points.len(), 1000);
603        assert_eq!(sap.points.len(), 1000);
604    }
605}