Skip to main content

nominal_streaming/
lib.rs

1/*!
2`nominal-streaming` is a crate for streaming data into [Nominal Core](https://nominal.io/products/core).
3
4The library aims to balance three concerns:
5
61. Data should exist in-memory only for a limited, configurable amount of time before it's sent to Core.
71. Writes should fall back to disk if there are network failures.
81. Backpressure should be applied to incoming requests when network throughput is saturated.
9
10This library streams data to Nominal Core, to a file, or to Nominal Core with a file as backup (recommended to protect against network failures).
11It also provides configuration to manage the tradeoff between above listed concerns.
12
13<div class="warning">
14This library is still under active development and may make breaking changes.
15</div>
16
17## Conceptual overview
18
19Data is sent to a [Stream](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalDatasetStream.html) via a Writer.
20For example:
21
22- A file stream is constructed as:
23
24  ```rust,no_run
25  use nominal_streaming::stream::NominalDatasetStreamBuilder;
26
27  let stream = NominalDatasetStreamBuilder::new()
28      .stream_to_file("my_data.avro")
29      .build();
30  ```
31
32- A stream that sends data to Nominal Core, but writes failed requests to a file, is created as follows:
33
34  ```rust,ignore
35  let stream = NominalDatasetStreamBuilder::new()
36      .stream_to_core(token, dataset_rid, handle)
37      .with_file_fallback("fallback.avro")
38      .build();
39  ```
40
41- Or, you can build a stream that sends data to Nominal Core *and* to a file:
42
43  ```rust,ignore
44  let stream = NominalDatasetStreamBuilder::new()
45      .stream_to_core(token, dataset_rid, handle)
46      .stream_to_file("my_data.avro")
47      .build();
48  ```
49
50(See below for a [full example](#example-streaming-from-memory-to-nominal-core-with-file-fallback), that also shows how to create the `token`, `dataset_rid`, and `handle` values above.)
51
52Once we have a Stream, we can construct a Writer and send values to it:
53
54```rust,ignore
55let channel_descriptor = ChannelDescriptor::with_tags(
56    "channel_1", [("experiment_id", "123")]
57);
58
59let mut writer = stream.double_writer(&channel_descriptor);
60
61// Stream single data point
62let start_time = UNIX_EPOCH.elapsed().unwrap();
63let value: f64 = 123;
64writer.push(start_time, value);
65```
66
67Here, we are enqueuing data onto Channel 1, with tags "name" and "batch".
68These are, of course, just examples, and you can choose your own.
69
70## Example: streaming from memory to Nominal Core, with file fallback
71
72This is the typical scenario where we want to stream some values from memory into a [Nominal Dataset](https://docs.nominal.io/core/sdk/python-client/streaming/overview#streaming-data-to-a-dataset).
73If the upload fails (say because of network errors), we'd like to instead send the data to an Avro file. Note that the Avro spec does not support uint64 values, so those will be stored as signed int64 values.
74
75Note that we set up the async [Tokio runtime](https://tokio.rs/), since that is required by the underlying [`NominalCoreConsumer`](https://docs.rs/nominal-streaming/latest/nominal_streaming/consumer/struct.NominalCoreConsumer.html).
76
77```rust,no_run
78use nominal_streaming::prelude::*;
79use nominal_streaming::stream::NominalDatasetStreamBuilder;
80
81use std::time::UNIX_EPOCH;
82
83
84static DATASET_RID: &str = "ri.catalog....";  // your dataset ID here
85
86
87fn main() {
88    // The NominalCoreConsumer requires a tokio runtime
89    tokio::runtime::Builder::new_multi_thread()
90        .enable_all()
91        .worker_threads(4)
92        .thread_name("tokio")
93        .build()
94        .expect("Failed to create Tokio runtime")
95        .block_on(async_main());
96}
97
98
99async fn async_main() {
100    // Configure token for authentication
101    let token = BearerToken::new(
102        std::env::var("NOMINAL_TOKEN")
103            .expect("NOMINAL_TOKEN environment variable not set")
104            .as_str(),
105    )
106    .expect("Invalid token");
107
108    let dataset_rid = ResourceIdentifier::new(DATASET_RID).unwrap();
109    let handle = tokio::runtime::Handle::current();
110
111    let stream = NominalDatasetStreamBuilder::new()
112        .stream_to_core(token, dataset_rid, handle)
113        .with_file_fallback("fallback.avro")
114        .build();
115
116    let channel_descriptor = ChannelDescriptor::with_tags("channel_1", [("experiment_id", "123")]);
117
118    let mut writer = stream.double_writer(&channel_descriptor);
119
120    // Generate and upload 100,000 data points
121    for i in 0..100_000 {
122        let start_time = UNIX_EPOCH.elapsed().unwrap();
123        let value = i % 50;
124        writer.push(start_time, value as f64);
125    }
126}
127```
128
129## Additional configuration
130
131### Stream options
132
133Above, you saw an example using [`NominalStreamOpts::default`](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalStreamOpts.html).
134The following stream options can be set using `.with_options(...)` on the StreamBuilder:
135
136```text
137NominalStreamOpts {
138  max_points_per_record: usize,
139  max_request_delay: Duration,
140  max_buffered_requests: usize,
141  request_dispatcher_tasks: usize,
142}
143```
144
145### Logging errors
146
147Most of the time, when things go wrong, we want some form of reporting. You can enable debug logging on the StreamBuilder by using `.enable_logging()`:
148
149```rust,ignore
150let stream = NominalDatasetStreamBuilder::new()
151    .stream_to_core(token, dataset_rid, handle)
152    .with_file_fallback("fallback.avro")
153    .enable_logging()
154    .build();
155```
156*/
157
158pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162pub mod types;
163pub mod upload;
164
165pub use nominal_api as api;
166
167/// This includes the most common types in this crate, re-exported for your convenience.
168pub mod prelude {
169    pub use conjure_object::BearerToken;
170    pub use conjure_object::ResourceIdentifier;
171    pub use nominal_api::tonic::google::protobuf::Timestamp;
172    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
173    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
174    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
175    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
176    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
177    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
178    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
179    pub use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
180    pub use nominal_api::tonic::io::nominal::scout::api::proto::StructPoints;
181    pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
182    pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
183    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
184    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
185
186    pub use crate::consumer::NominalCoreConsumer;
187    pub use crate::stream::NominalDatasetStream;
188    #[expect(deprecated)]
189    pub use crate::stream::NominalDatasourceStream;
190    pub use crate::stream::NominalStreamOpts;
191    pub use crate::types::AuthProvider;
192    pub use crate::types::ChannelDescriptor;
193    pub use crate::types::IntoTimestamp;
194}
195
196#[cfg(test)]
197mod tests {
198    use std::collections::HashMap;
199    use std::sync::Arc;
200    use std::sync::Mutex;
201    use std::thread;
202    use std::time::Duration;
203    use std::time::UNIX_EPOCH;
204
205    use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
206
207    use crate::client::PRODUCTION_API_URL;
208    use crate::consumer::ConsumerResult;
209    use crate::consumer::WriteRequestConsumer;
210    use crate::prelude::*;
211
212    #[derive(Debug)]
213    struct TestDatasourceStream {
214        requests: Mutex<Vec<WriteRequestNominal>>,
215    }
216
217    impl WriteRequestConsumer for Arc<TestDatasourceStream> {
218        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
219            self.requests.lock().unwrap().push(request.clone());
220            Ok(())
221        }
222    }
223
224    fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
225        let test_consumer = Arc::new(TestDatasourceStream {
226            requests: Mutex::new(vec![]),
227        });
228        let stream = NominalDatasetStream::new_with_consumer(
229            test_consumer.clone(),
230            NominalStreamOpts {
231                max_points_per_record: 1000,
232                max_request_delay: Duration::from_millis(100),
233                max_buffered_requests: 2,
234                request_dispatcher_tasks: 4,
235                base_api_url: PRODUCTION_API_URL.to_string(),
236            },
237        );
238
239        (test_consumer, stream)
240    }
241
242    #[test_log::test]
243    fn test_stream() {
244        let (test_consumer, stream) = create_test_stream();
245
246        for batch in 0..5 {
247            let mut points = Vec::new();
248            for i in 0..1000 {
249                let start_time = UNIX_EPOCH.elapsed().unwrap();
250                points.push(DoublePoint {
251                    timestamp: Some(Timestamp {
252                        seconds: start_time.as_secs() as i64,
253                        nanos: start_time.subsec_nanos() as i32 + i,
254                    }),
255                    value: (i % 50) as f64,
256                });
257            }
258
259            stream.enqueue(
260                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
261                points,
262            );
263        }
264
265        drop(stream); // wait for points to flush
266
267        let requests = test_consumer.requests.lock().unwrap();
268
269        // validate that the requests were flushed based on the max_records value, not the
270        // max request delay
271        assert_eq!(requests.len(), 5);
272        let series = requests.first().unwrap().series.first().unwrap();
273        if let Some(PointsType::DoublePoints(points)) =
274            series.points.as_ref().unwrap().points_type.as_ref()
275        {
276            assert_eq!(points.points.len(), 1000);
277        } else {
278            panic!("unexpected data type");
279        }
280    }
281
282    #[test_log::test]
283    fn test_stream_types() {
284        let (test_consumer, stream) = create_test_stream();
285
286        for batch in 0..5 {
287            let mut doubles = Vec::new();
288            let mut strings = Vec::new();
289            let mut structs = Vec::new();
290            let mut ints = Vec::new();
291            let mut uints = Vec::new();
292            for i in 0..1000 {
293                let start_time = UNIX_EPOCH.elapsed().unwrap();
294                doubles.push(DoublePoint {
295                    timestamp: Some(start_time.into_timestamp()),
296                    value: (i % 50) as f64,
297                });
298                strings.push(StringPoint {
299                    timestamp: Some(start_time.into_timestamp()),
300                    value: format!("{}", i % 50),
301                });
302                structs.push(StructPoint {
303                    timestamp: Some(start_time.into_timestamp()),
304                    json_string: format!("{}", i % 50),
305                });
306                ints.push(IntegerPoint {
307                    timestamp: Some(start_time.into_timestamp()),
308                    value: i % 50,
309                });
310                uints.push(Uint64Point {
311                    timestamp: Some(start_time.into_timestamp()),
312                    value: (i % 50) as u64,
313                })
314            }
315
316            stream.enqueue(
317                &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
318                doubles,
319            );
320            stream.enqueue(
321                &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
322                strings,
323            );
324            stream.enqueue(
325                &ChannelDescriptor::with_tags("struct", [("batch_id", batch.to_string())]),
326                structs,
327            );
328            stream.enqueue(
329                &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
330                ints,
331            );
332            stream.enqueue(
333                &ChannelDescriptor::with_tags("uint64", [("batch_id", batch.to_string())]),
334                uints,
335            );
336        }
337
338        drop(stream); // wait for points to flush
339
340        let requests = test_consumer.requests.lock().unwrap();
341
342        // validate that the requests were flushed based on the max_records value, not the
343        // max request delay
344        assert_eq!(requests.len(), 25);
345
346        let r = requests
347            .iter()
348            .flat_map(|r| r.series.clone())
349            .map(|s| {
350                (
351                    s.channel.unwrap().name,
352                    s.points.unwrap().points_type.unwrap(),
353                )
354            })
355            .collect::<HashMap<_, _>>();
356        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
357            panic!("invalid double points type");
358        };
359
360        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
361            panic!("invalid int points type");
362        };
363
364        let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
365            panic!("invalid uint64 points type");
366        };
367
368        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
369            panic!("invalid string points type");
370        };
371
372        let PointsType::StructPoints(stp) = r.get("struct").unwrap() else {
373            panic!("invalid struct points type");
374        };
375
376        // collect() overwrites into a single request
377        assert_eq!(dp.points.len(), 1000);
378        assert_eq!(sp.points.len(), 1000);
379        assert_eq!(ip.points.len(), 1000);
380        assert_eq!(up.points.len(), 1000);
381        assert_eq!(stp.points.len(), 1000);
382    }
383
384    #[test_log::test]
385    fn test_writer() {
386        let (test_consumer, stream) = create_test_stream();
387
388        let cd = ChannelDescriptor::new("channel_1");
389        let mut writer = stream.double_writer(&cd);
390
391        for i in 0..5000 {
392            let start_time = UNIX_EPOCH.elapsed().unwrap();
393            let value = i % 50;
394            writer.push(start_time, value as f64);
395        }
396
397        drop(writer); // flush points to stream
398        drop(stream); // flush stream to nominal
399
400        let requests = test_consumer.requests.lock().unwrap();
401
402        assert_eq!(requests.len(), 5);
403        let series = requests.first().unwrap().series.first().unwrap();
404        if let Some(PointsType::DoublePoints(points)) =
405            series.points.as_ref().unwrap().points_type.as_ref()
406        {
407            assert_eq!(points.points.len(), 1000);
408        } else {
409            panic!("unexpected data type");
410        }
411    }
412
413    #[test_log::test]
414    fn test_time_flush() {
415        let (test_consumer, stream) = create_test_stream();
416
417        let cd = ChannelDescriptor::new("channel_1");
418        let mut writer = stream.double_writer(&cd);
419
420        writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
421        thread::sleep(Duration::from_millis(101));
422        writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); // first flush
423        thread::sleep(Duration::from_millis(101));
424        writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); // second flush
425
426        drop(writer);
427        drop(stream);
428
429        let requests = test_consumer.requests.lock().unwrap();
430        dbg!(&requests);
431        assert_eq!(requests.len(), 2);
432    }
433
434    #[test_log::test]
435    fn test_writer_types() {
436        let (test_consumer, stream) = create_test_stream();
437
438        let cd1 = ChannelDescriptor::new("double");
439        let cd2 = ChannelDescriptor::new("string");
440        let cd3 = ChannelDescriptor::new("int");
441        let cd4 = ChannelDescriptor::new("uint64");
442        let cd5 = ChannelDescriptor::new("struct");
443        let mut double_writer = stream.double_writer(&cd1);
444        let mut string_writer = stream.string_writer(&cd2);
445        let mut integer_writer = stream.integer_writer(&cd3);
446        let mut uint64_writer = stream.uint64_writer(&cd4);
447        let mut struct_writer = stream.struct_writer(&cd5);
448
449        for i in 0..5000 {
450            let start_time = UNIX_EPOCH.elapsed().unwrap();
451            let value = i % 50;
452            double_writer.push(start_time, value as f64);
453            string_writer.push(start_time, format!("{}", value));
454            integer_writer.push(start_time, value);
455            uint64_writer.push(start_time, value as u64);
456            struct_writer.push(start_time, format!("{}", value));
457        }
458
459        drop(double_writer);
460        drop(string_writer);
461        drop(integer_writer);
462        drop(uint64_writer);
463        drop(struct_writer);
464        drop(stream);
465
466        let requests = test_consumer.requests.lock().unwrap();
467
468        assert_eq!(requests.len(), 25);
469
470        let r = requests
471            .iter()
472            .flat_map(|r| r.series.clone())
473            .map(|s| {
474                (
475                    s.channel.unwrap().name,
476                    s.points.unwrap().points_type.unwrap(),
477                )
478            })
479            .collect::<HashMap<_, _>>();
480
481        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
482            panic!("invalid double points type");
483        };
484
485        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
486            panic!("invalid int points type");
487        };
488
489        let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
490            panic!("invalid uint64 points type");
491        };
492
493        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
494            panic!("invalid string points type");
495        };
496
497        let PointsType::StructPoints(stp) = r.get("struct").unwrap() else {
498            panic!("invalid struct points type");
499        };
500
501        // collect() overwrites into a single request
502        assert_eq!(dp.points.len(), 1000);
503        assert_eq!(sp.points.len(), 1000);
504        assert_eq!(ip.points.len(), 1000);
505        assert_eq!(up.points.len(), 1000);
506        assert_eq!(stp.points.len(), 1000);
507    }
508}