nominal_streaming/
lib.rs

1/*!
2`nominal-streaming` is a crate for streaming data into [Nominal Core](https://nominal.io/products/core).
3
4The library aims to balance three concerns:
5
61. Data should exist in-memory only for a limited, configurable amount of time before it's sent to Core.
71. Writes should fall back to disk if there are network failures.
81. Backpressure should be applied to incoming requests when network throughput is saturated.
9
10This library streams data to Nominal Core, to a file, or to Nominal Core with a file as backup (recommended to protect against network failures).
11It also provides configuration to manage the tradeoff between above listed concerns.
12
13<div class="warning">
14This library is still under active development and may make breaking changes.
15</div>
16
17## Conceptual overview
18
19Data is sent to a [Stream](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalDatasetStream.html) via a Writer.
20For example:
21
22- A file stream is constructed as:
23
24  ```rust,no_run
25  use nominal_streaming::stream::NominalDatasetStreamBuilder;
26
27  let stream = NominalDatasetStreamBuilder::new()
28      .stream_to_file("my_data.avro")
29      .build();
30  ```
31
32- A stream that sends data to Nominal Core, but writes failed requests to a file, is created as follows:
33
34  ```rust,ignore
35  let stream = NominalDatasetStreamBuilder::new()
36      .stream_to_core(token, dataset_rid, handle)
37      .with_file_fallback("fallback.avro")
38      .build();
39  ```
40
41- Or, you can build a stream that sends data to Nominal Core *and* to a file:
42
43  ```rust,ignore
44  let stream = NominalDatasetStreamBuilder::new()
45      .stream_to_core(token, dataset_rid, handle)
46      .stream_to_file("my_data.avro")
47      .build();
48  ```
49
50(See below for a [full example](#example-streaming-from-memory-to-nominal-core-with-file-fallback), that also shows how to create the `token`, `dataset_rid`, and `handle` values above.)
51
52Once we have a Stream, we can construct a Writer and send values to it:
53
54```rust,ignore
55let channel_descriptor = ChannelDescriptor::with_tags(
56    "channel_1", [("experiment_id", "123")]
57);
58
59let mut writer = stream.double_writer(&channel_descriptor);
60
61// Stream single data point
62let start_time = UNIX_EPOCH.elapsed().unwrap();
63let value: f64 = 123;
64writer.push(start_time, value);
65```
66
67Here, we are enquing data onto Channel 1, with tags "name" and "batch".
68These are, of course, just examples, and you can choose your own.
69
70## Example: streaming from memory to Nominal Core, with file fallback
71
72This is the typical scenario where we want to stream some values from memory into a [Nominal Dataset](https://docs.nominal.io/core/sdk/python-client/streaming/overview#streaming-data-to-a-dataset).
73If the upload fails (say because of network errors), we'd like to instead send the data to an AVRO file.
74
75Note that we set up the async [Tokio runtime](https://tokio.rs/), since that is required by the underlying [`NominalCoreConsumer`](https://docs.rs/nominal-streaming/latest/nominal_streaming/consumer/struct.NominalCoreConsumer.html).
76
77```rust,no_run
78use nominal_streaming::prelude::*;
79use nominal_streaming::stream::NominalDatasetStreamBuilder;
80
81use std::time::UNIX_EPOCH;
82
83
84static DATASET_RID: &str = "ri.catalog....";  // your dataset ID here
85
86
87fn main() {
88    // The NominalCoreConsumer requires a tokio runtime
89    tokio::runtime::Builder::new_multi_thread()
90        .enable_all()
91        .worker_threads(4)
92        .thread_name("tokio")
93        .build()
94        .expect("Failed to create Tokio runtime")
95        .block_on(async_main());
96}
97
98
99async fn async_main() {
100    // Configure token for authentication
101    let token = BearerToken::new(
102        std::env::var("NOMINAL_TOKEN")
103            .expect("NOMINAL_TOKEN environment variable not set")
104            .as_str(),
105    )
106    .expect("Invalid token");
107
108    let dataset_rid = ResourceIdentifier::new(DATASET_RID).unwrap();
109    let handle = tokio::runtime::Handle::current();
110
111    let stream = NominalDatasetStreamBuilder::new()
112        .stream_to_core(token, dataset_rid, handle)
113        .with_file_fallback("fallback.avro")
114        .build();
115
116    let channel_descriptor = ChannelDescriptor::with_tags("channel_1", [("experiment_id", "123")]);
117
118    let mut writer = stream.double_writer(&channel_descriptor);
119
120    // Generate and upload 100,000 data points
121    for i in 0..100_000 {
122        let start_time = UNIX_EPOCH.elapsed().unwrap();
123        let value = i % 50;
124        writer.push(start_time, value as f64);
125    }
126}
127```
128
129## Additional configuration
130
131### Stream options
132
133Above, you saw an example using [`NominalStreamOpts::default`](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalStreamOpts.html).
134The following stream options can be set using `.with_options(...)` on the StreamBuilder:
135
136```text
137NominalStreamOpts {
138  max_points_per_record: usize,
139  max_request_delay: Duration,
140  max_buffered_requests: usize,
141  request_dispatcher_tasks: usize,
142}
143```
144
145### Logging errors
146
147Most of the time, when things go wrong, we want some form of reporting. You can enable debug logging on the StreamBuilder by using `.enable_logging()`:
148
149```rust,ignore
150let stream = NominalDatasetStreamBuilder::new()
151    .stream_to_core(token, dataset_rid, handle)
152    .with_file_fallback("fallback.avro")
153    .enable_logging()
154    .build();
155```
156*/
157
158pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162mod types;
163pub mod upload;
164
165pub use nominal_api as api;
166
167/// This includes the most common types in this crate, re-exported for your convenience.
168pub mod prelude {
169    pub use conjure_object::BearerToken;
170    pub use conjure_object::ResourceIdentifier;
171    pub use nominal_api::tonic::google::protobuf::Timestamp;
172    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
173    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
174    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
175    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
176    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
177    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
178    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
179    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
180    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
181
182    pub use crate::consumer::NominalCoreConsumer;
183    pub use crate::stream::NominalDatasetStream;
184    #[expect(deprecated)]
185    pub use crate::stream::NominalDatasourceStream;
186    pub use crate::stream::NominalStreamOpts;
187    pub use crate::types::AuthProvider;
188    pub use crate::types::ChannelDescriptor;
189}
190
191#[cfg(test)]
192mod tests {
193    use std::collections::HashMap;
194    use std::sync::Arc;
195    use std::sync::Mutex;
196    use std::thread;
197    use std::time::Duration;
198    use std::time::UNIX_EPOCH;
199
200    use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
201
202    use crate::client::PRODUCTION_API_URL;
203    use crate::consumer::ConsumerResult;
204    use crate::consumer::WriteRequestConsumer;
205    use crate::prelude::*;
206    use crate::types::IntoTimestamp;
207
208    #[derive(Debug)]
209    struct TestDatasourceStream {
210        requests: Mutex<Vec<WriteRequestNominal>>,
211    }
212
213    impl WriteRequestConsumer for Arc<TestDatasourceStream> {
214        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
215            self.requests.lock().unwrap().push(request.clone());
216            Ok(())
217        }
218    }
219
220    fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
221        let test_consumer = Arc::new(TestDatasourceStream {
222            requests: Mutex::new(vec![]),
223        });
224        let stream = NominalDatasetStream::new_with_consumer(
225            test_consumer.clone(),
226            NominalStreamOpts {
227                max_points_per_record: 1000,
228                max_request_delay: Duration::from_millis(100),
229                max_buffered_requests: 2,
230                request_dispatcher_tasks: 4,
231                base_api_url: PRODUCTION_API_URL.to_string(),
232            },
233        );
234
235        (test_consumer, stream)
236    }
237
238    #[test_log::test]
239    fn test_stream() {
240        let (test_consumer, stream) = create_test_stream();
241
242        for batch in 0..5 {
243            let mut points = Vec::new();
244            for i in 0..1000 {
245                let start_time = UNIX_EPOCH.elapsed().unwrap();
246                points.push(DoublePoint {
247                    timestamp: Some(Timestamp {
248                        seconds: start_time.as_secs() as i64,
249                        nanos: start_time.subsec_nanos() as i32 + i,
250                    }),
251                    value: (i % 50) as f64,
252                });
253            }
254
255            stream.enqueue(
256                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
257                points,
258            );
259        }
260
261        drop(stream); // wait for points to flush
262
263        let requests = test_consumer.requests.lock().unwrap();
264
265        // validate that the requests were flushed based on the max_records value, not the
266        // max request delay
267        assert_eq!(requests.len(), 5);
268        let series = requests.first().unwrap().series.first().unwrap();
269        if let Some(PointsType::DoublePoints(points)) =
270            series.points.as_ref().unwrap().points_type.as_ref()
271        {
272            assert_eq!(points.points.len(), 1000);
273        } else {
274            panic!("unexpected data type");
275        }
276    }
277
278    #[test_log::test]
279    fn test_stream_types() {
280        let (test_consumer, stream) = create_test_stream();
281
282        for batch in 0..5 {
283            let mut doubles = Vec::new();
284            let mut strings = Vec::new();
285            let mut ints = Vec::new();
286            for i in 0..1000 {
287                let start_time = UNIX_EPOCH.elapsed().unwrap();
288                doubles.push(DoublePoint {
289                    timestamp: Some(start_time.into_timestamp()),
290                    value: (i % 50) as f64,
291                });
292                strings.push(StringPoint {
293                    timestamp: Some(start_time.into_timestamp()),
294                    value: format!("{}", i % 50),
295                });
296                ints.push(IntegerPoint {
297                    timestamp: Some(start_time.into_timestamp()),
298                    value: i % 50,
299                })
300            }
301
302            stream.enqueue(
303                &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
304                doubles,
305            );
306            stream.enqueue(
307                &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
308                strings,
309            );
310            stream.enqueue(
311                &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
312                ints,
313            );
314        }
315
316        drop(stream); // wait for points to flush
317
318        let requests = test_consumer.requests.lock().unwrap();
319
320        // validate that the requests were flushed based on the max_records value, not the
321        // max request delay
322        assert_eq!(requests.len(), 15);
323
324        let r = requests
325            .iter()
326            .flat_map(|r| r.series.clone())
327            .map(|s| {
328                (
329                    s.channel.unwrap().name,
330                    s.points.unwrap().points_type.unwrap(),
331                )
332            })
333            .collect::<HashMap<_, _>>();
334        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
335            panic!("invalid double points type");
336        };
337
338        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
339            panic!("invalid int points type");
340        };
341
342        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
343            panic!("invalid string points type");
344        };
345
346        // collect() overwrites into a single request
347        assert_eq!(dp.points.len(), 1000);
348        assert_eq!(sp.points.len(), 1000);
349        assert_eq!(ip.points.len(), 1000);
350    }
351
352    #[test_log::test]
353    fn test_writer() {
354        let (test_consumer, stream) = create_test_stream();
355
356        let cd = ChannelDescriptor::new("channel_1");
357        let mut writer = stream.double_writer(&cd);
358
359        for i in 0..5000 {
360            let start_time = UNIX_EPOCH.elapsed().unwrap();
361            let value = i % 50;
362            writer.push(start_time, value as f64);
363        }
364
365        drop(writer); // flush points to stream
366        drop(stream); // flush stream to nominal
367
368        let requests = test_consumer.requests.lock().unwrap();
369
370        assert_eq!(requests.len(), 5);
371        let series = requests.first().unwrap().series.first().unwrap();
372        if let Some(PointsType::DoublePoints(points)) =
373            series.points.as_ref().unwrap().points_type.as_ref()
374        {
375            assert_eq!(points.points.len(), 1000);
376        } else {
377            panic!("unexpected data type");
378        }
379    }
380
381    #[test_log::test]
382    fn test_time_flush() {
383        let (test_consumer, stream) = create_test_stream();
384
385        let cd = ChannelDescriptor::new("channel_1");
386        let mut writer = stream.double_writer(&cd);
387
388        writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
389        thread::sleep(Duration::from_millis(101));
390        writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); // first flush
391        thread::sleep(Duration::from_millis(101));
392        writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); // second flush
393
394        drop(writer);
395        drop(stream);
396
397        let requests = test_consumer.requests.lock().unwrap();
398        dbg!(&requests);
399        assert_eq!(requests.len(), 2);
400    }
401
402    #[test_log::test]
403    fn test_writer_types() {
404        let (test_consumer, stream) = create_test_stream();
405
406        let cd1 = ChannelDescriptor::new("double");
407        let cd2 = ChannelDescriptor::new("string");
408        let cd3 = ChannelDescriptor::new("int");
409        let mut double_writer = stream.double_writer(&cd1);
410        let mut string_writer = stream.string_writer(&cd2);
411        let mut integer_writer = stream.integer_writer(&cd3);
412
413        for i in 0..5000 {
414            let start_time = UNIX_EPOCH.elapsed().unwrap();
415            let value = i % 50;
416            double_writer.push(start_time, value as f64);
417            string_writer.push(start_time, format!("{}", value));
418            integer_writer.push(start_time, value);
419        }
420
421        drop(double_writer);
422        drop(string_writer);
423        drop(integer_writer);
424        drop(stream);
425
426        let requests = test_consumer.requests.lock().unwrap();
427
428        assert_eq!(requests.len(), 15);
429
430        let r = requests
431            .iter()
432            .flat_map(|r| r.series.clone())
433            .map(|s| {
434                (
435                    s.channel.unwrap().name,
436                    s.points.unwrap().points_type.unwrap(),
437                )
438            })
439            .collect::<HashMap<_, _>>();
440
441        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
442            panic!("invalid double points type");
443        };
444
445        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
446            panic!("invalid int points type");
447        };
448
449        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
450            panic!("invalid string points type");
451        };
452
453        // collect() overwrites into a single request
454        assert_eq!(dp.points.len(), 1000);
455        assert_eq!(sp.points.len(), 1000);
456        assert_eq!(ip.points.len(), 1000);
457    }
458}