nominal_streaming/
lib.rs

1/*!
2`nominal-streaming` is a crate for streaming data into [Nominal Core](https://nominal.io/products/core).
3
4The library aims to balance three concerns:
5
61. Data should exist in-memory only for a limited, configurable amount of time before it's sent to Core.
71. Writes should fall back to disk if there are network failures.
81. Backpressure should be applied to incoming requests when network throughput is saturated.
9
10This library streams data to Nominal Core, to a file, or to Nominal Core with a file as backup (recommended to protect against network failures).
11It also provides configuration to manage the tradeoff between above listed concerns.
12
13<div class="warning">
14This library is still under active development and may make breaking changes.
15</div>
16
17## Conceptual overview
18
19Data is sent to a [Stream](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalDatasetStream.html) via a Writer.
20For example:
21
22- A file stream is constructed as:
23
24  ```rust,no_run
25  use nominal_streaming::stream::NominalDatasetStreamBuilder;
26
27  let stream = NominalDatasetStreamBuilder::new()
28      .stream_to_file("my_data.avro")
29      .build();
30  ```
31
32- A stream that sends data to Nominal Core, but writes failed requests to a file, is created as follows:
33
34  ```rust,ignore
35  let stream = NominalDatasetStreamBuilder::new()
36      .stream_to_core(token, dataset_rid, handle)
37      .with_file_fallback("fallback.avro")
38      .build();
39  ```
40
41- Or, you can build a stream that sends data to Nominal Core *and* to a file:
42
43  ```rust,ignore
44  let stream = NominalDatasetStreamBuilder::new()
45      .stream_to_core(token, dataset_rid, handle)
46      .stream_to_file("my_data.avro")
47      .build();
48  ```
49
50(See below for a [full example](#example-streaming-from-memory-to-nominal-core-with-file-fallback), that also shows how to create the `token`, `dataset_rid`, and `handle` values above.)
51
52Once we have a Stream, we can construct a Writer and send values to it:
53
54```rust,ignore
55let channel_descriptor = ChannelDescriptor::with_tags(
56    "channel_1", [("experiment_id", "123")]
57);
58
59let mut writer = stream.double_writer(&channel_descriptor);
60
61// Stream single data point
62let start_time = UNIX_EPOCH.elapsed().unwrap();
63let value: f64 = 123;
64writer.push(start_time, value);
65```
66
67Here, we are enquing data onto Channel 1, with tags "name" and "batch".
68These are, of course, just examples, and you can choose your own.
69
70## Example: streaming from memory to Nominal Core, with file fallback
71
72This is the typical scenario where we want to stream some values from memory into a [Nominal Dataset](https://docs.nominal.io/core/sdk/python-client/streaming/overview#streaming-data-to-a-dataset).
73If the upload fails (say because of network errors), we'd like to instead send the data to an AVRO file.
74
75Note that we set up the async [Tokio runtime](https://tokio.rs/), since that is required by the underlying [`NominalCoreConsumer`](https://docs.rs/nominal-streaming/latest/nominal_streaming/consumer/struct.NominalCoreConsumer.html).
76
77```rust,no_run
78use nominal_streaming::prelude::*;
79use nominal_streaming::stream::NominalDatasetStreamBuilder;
80
81use std::time::UNIX_EPOCH;
82
83
84static DATASET_RID: &str = "ri.catalog....";  // your dataset ID here
85
86
87fn main() {
88    // The NominalCoreConsumer requires a tokio runtime
89    tokio::runtime::Builder::new_multi_thread()
90        .enable_all()
91        .worker_threads(4)
92        .thread_name("tokio")
93        .build()
94        .expect("Failed to create Tokio runtime")
95        .block_on(async_main());
96}
97
98
99async fn async_main() {
100    // Configure token for authentication
101    let token = BearerToken::new(
102        std::env::var("NOMINAL_TOKEN")
103            .expect("NOMINAL_TOKEN environment variable not set")
104            .as_str(),
105    )
106    .expect("Invalid token");
107
108    let dataset_rid = ResourceIdentifier::new(DATASET_RID).unwrap();
109    let handle = tokio::runtime::Handle::current();
110
111    let stream = NominalDatasetStreamBuilder::new()
112        .stream_to_core(token, dataset_rid, handle)
113        .with_file_fallback("fallback.avro")
114        .build();
115
116    let channel_descriptor = ChannelDescriptor::with_tags("channel_1", [("experiment_id", "123")]);
117
118    let mut writer = stream.double_writer(&channel_descriptor);
119
120    // Generate and upload 100,000 data points
121    for i in 0..100_000 {
122        let start_time = UNIX_EPOCH.elapsed().unwrap();
123        let value = i % 50;
124        writer.push(start_time, value as f64);
125    }
126}
127```
128
129## Additional configuration
130
131### Stream options
132
133Above, you saw an example using [`NominalStreamOpts::default`](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalStreamOpts.html).
134The following stream options can be set using `.with_options(...)` on the StreamBuilder:
135
136```text
137NominalStreamOpts {
138  max_points_per_record: usize,
139  max_request_delay: Duration,
140  max_buffered_requests: usize,
141  request_dispatcher_tasks: usize,
142}
143```
144
145### Logging errors
146
147Most of the time, when things go wrong, we want some form of reporting. You can enable debug logging on the StreamBuilder by using `.enable_logging()`:
148
149```rust,ignore
150let stream = NominalDatasetStreamBuilder::new()
151    .stream_to_core(token, dataset_rid, handle)
152    .with_file_fallback("fallback.avro")
153    .enable_logging()
154    .build();
155```
156*/
157
158pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162pub mod types;
163pub mod upload;
164
165pub use nominal_api as api;
166
167/// This includes the most common types in this crate, re-exported for your convenience.
168pub mod prelude {
169    pub use conjure_object::BearerToken;
170    pub use conjure_object::ResourceIdentifier;
171    pub use nominal_api::tonic::google::protobuf::Timestamp;
172    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
173    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
174    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
175    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
176    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
177    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
178    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
179    pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
180    pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
181    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
182    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
183
184    pub use crate::consumer::NominalCoreConsumer;
185    pub use crate::stream::NominalDatasetStream;
186    #[expect(deprecated)]
187    pub use crate::stream::NominalDatasourceStream;
188    pub use crate::stream::NominalStreamOpts;
189    pub use crate::types::AuthProvider;
190    pub use crate::types::ChannelDescriptor;
191    pub use crate::types::IntoTimestamp;
192}
193
194#[cfg(test)]
195mod tests {
196    use std::collections::HashMap;
197    use std::sync::Arc;
198    use std::sync::Mutex;
199    use std::thread;
200    use std::time::Duration;
201    use std::time::UNIX_EPOCH;
202
203    use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
204
205    use crate::client::PRODUCTION_API_URL;
206    use crate::consumer::ConsumerResult;
207    use crate::consumer::WriteRequestConsumer;
208    use crate::prelude::*;
209
210    #[derive(Debug)]
211    struct TestDatasourceStream {
212        requests: Mutex<Vec<WriteRequestNominal>>,
213    }
214
215    impl WriteRequestConsumer for Arc<TestDatasourceStream> {
216        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
217            self.requests.lock().unwrap().push(request.clone());
218            Ok(())
219        }
220    }
221
222    fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
223        let test_consumer = Arc::new(TestDatasourceStream {
224            requests: Mutex::new(vec![]),
225        });
226        let stream = NominalDatasetStream::new_with_consumer(
227            test_consumer.clone(),
228            NominalStreamOpts {
229                max_points_per_record: 1000,
230                max_request_delay: Duration::from_millis(100),
231                max_buffered_requests: 2,
232                request_dispatcher_tasks: 4,
233                base_api_url: PRODUCTION_API_URL.to_string(),
234            },
235        );
236
237        (test_consumer, stream)
238    }
239
240    #[test_log::test]
241    fn test_stream() {
242        let (test_consumer, stream) = create_test_stream();
243
244        for batch in 0..5 {
245            let mut points = Vec::new();
246            for i in 0..1000 {
247                let start_time = UNIX_EPOCH.elapsed().unwrap();
248                points.push(DoublePoint {
249                    timestamp: Some(Timestamp {
250                        seconds: start_time.as_secs() as i64,
251                        nanos: start_time.subsec_nanos() as i32 + i,
252                    }),
253                    value: (i % 50) as f64,
254                });
255            }
256
257            stream.enqueue(
258                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
259                points,
260            );
261        }
262
263        drop(stream); // wait for points to flush
264
265        let requests = test_consumer.requests.lock().unwrap();
266
267        // validate that the requests were flushed based on the max_records value, not the
268        // max request delay
269        assert_eq!(requests.len(), 5);
270        let series = requests.first().unwrap().series.first().unwrap();
271        if let Some(PointsType::DoublePoints(points)) =
272            series.points.as_ref().unwrap().points_type.as_ref()
273        {
274            assert_eq!(points.points.len(), 1000);
275        } else {
276            panic!("unexpected data type");
277        }
278    }
279
280    #[test_log::test]
281    fn test_stream_types() {
282        let (test_consumer, stream) = create_test_stream();
283
284        for batch in 0..5 {
285            let mut doubles = Vec::new();
286            let mut strings = Vec::new();
287            let mut ints = Vec::new();
288            let mut uints = Vec::new();
289            for i in 0..1000 {
290                let start_time = UNIX_EPOCH.elapsed().unwrap();
291                doubles.push(DoublePoint {
292                    timestamp: Some(start_time.into_timestamp()),
293                    value: (i % 50) as f64,
294                });
295                strings.push(StringPoint {
296                    timestamp: Some(start_time.into_timestamp()),
297                    value: format!("{}", i % 50),
298                });
299                ints.push(IntegerPoint {
300                    timestamp: Some(start_time.into_timestamp()),
301                    value: i % 50,
302                });
303                uints.push(Uint64Point {
304                    timestamp: Some(start_time.into_timestamp()),
305                    value: (i % 50) as u64,
306                })
307            }
308
309            stream.enqueue(
310                &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
311                doubles,
312            );
313            stream.enqueue(
314                &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
315                strings,
316            );
317            stream.enqueue(
318                &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
319                ints,
320            );
321            stream.enqueue(
322                &ChannelDescriptor::with_tags("uint64", [("batch_id", batch.to_string())]),
323                uints,
324            );
325        }
326
327        drop(stream); // wait for points to flush
328
329        let requests = test_consumer.requests.lock().unwrap();
330
331        // validate that the requests were flushed based on the max_records value, not the
332        // max request delay
333        assert_eq!(requests.len(), 20);
334
335        let r = requests
336            .iter()
337            .flat_map(|r| r.series.clone())
338            .map(|s| {
339                (
340                    s.channel.unwrap().name,
341                    s.points.unwrap().points_type.unwrap(),
342                )
343            })
344            .collect::<HashMap<_, _>>();
345        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
346            panic!("invalid double points type");
347        };
348
349        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
350            panic!("invalid int points type");
351        };
352
353        let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
354            panic!("invalid uint64 points type");
355        };
356
357        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
358            panic!("invalid string points type");
359        };
360
361        // collect() overwrites into a single request
362        assert_eq!(dp.points.len(), 1000);
363        assert_eq!(sp.points.len(), 1000);
364        assert_eq!(ip.points.len(), 1000);
365        assert_eq!(up.points.len(), 1000);
366    }
367
368    #[test_log::test]
369    fn test_writer() {
370        let (test_consumer, stream) = create_test_stream();
371
372        let cd = ChannelDescriptor::new("channel_1");
373        let mut writer = stream.double_writer(&cd);
374
375        for i in 0..5000 {
376            let start_time = UNIX_EPOCH.elapsed().unwrap();
377            let value = i % 50;
378            writer.push(start_time, value as f64);
379        }
380
381        drop(writer); // flush points to stream
382        drop(stream); // flush stream to nominal
383
384        let requests = test_consumer.requests.lock().unwrap();
385
386        assert_eq!(requests.len(), 5);
387        let series = requests.first().unwrap().series.first().unwrap();
388        if let Some(PointsType::DoublePoints(points)) =
389            series.points.as_ref().unwrap().points_type.as_ref()
390        {
391            assert_eq!(points.points.len(), 1000);
392        } else {
393            panic!("unexpected data type");
394        }
395    }
396
397    #[test_log::test]
398    fn test_time_flush() {
399        let (test_consumer, stream) = create_test_stream();
400
401        let cd = ChannelDescriptor::new("channel_1");
402        let mut writer = stream.double_writer(&cd);
403
404        writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
405        thread::sleep(Duration::from_millis(101));
406        writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); // first flush
407        thread::sleep(Duration::from_millis(101));
408        writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); // second flush
409
410        drop(writer);
411        drop(stream);
412
413        let requests = test_consumer.requests.lock().unwrap();
414        dbg!(&requests);
415        assert_eq!(requests.len(), 2);
416    }
417
418    #[test_log::test]
419    fn test_writer_types() {
420        let (test_consumer, stream) = create_test_stream();
421
422        let cd1 = ChannelDescriptor::new("double");
423        let cd2 = ChannelDescriptor::new("string");
424        let cd3 = ChannelDescriptor::new("int");
425        let cd4 = ChannelDescriptor::new("uint64");
426        let mut double_writer = stream.double_writer(&cd1);
427        let mut string_writer = stream.string_writer(&cd2);
428        let mut integer_writer = stream.integer_writer(&cd3);
429        let mut uint64_writer = stream.uint64_writer(&cd4);
430
431        for i in 0..5000 {
432            let start_time = UNIX_EPOCH.elapsed().unwrap();
433            let value = i % 50;
434            double_writer.push(start_time, value as f64);
435            string_writer.push(start_time, format!("{}", value));
436            integer_writer.push(start_time, value);
437            uint64_writer.push(start_time, value as u64);
438        }
439
440        drop(double_writer);
441        drop(string_writer);
442        drop(integer_writer);
443        drop(uint64_writer);
444        drop(stream);
445
446        let requests = test_consumer.requests.lock().unwrap();
447
448        assert_eq!(requests.len(), 20);
449
450        let r = requests
451            .iter()
452            .flat_map(|r| r.series.clone())
453            .map(|s| {
454                (
455                    s.channel.unwrap().name,
456                    s.points.unwrap().points_type.unwrap(),
457                )
458            })
459            .collect::<HashMap<_, _>>();
460
461        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
462            panic!("invalid double points type");
463        };
464
465        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
466            panic!("invalid int points type");
467        };
468
469        let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
470            panic!("invalid uint64 points type");
471        };
472
473        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
474            panic!("invalid string points type");
475        };
476
477        // collect() overwrites into a single request
478        assert_eq!(dp.points.len(), 1000);
479        assert_eq!(sp.points.len(), 1000);
480        assert_eq!(ip.points.len(), 1000);
481        assert_eq!(up.points.len(), 1000);
482    }
483}