nominal_streaming/
lib.rs

1/*!
2`nominal-streaming` is a crate for streaming data into [Nominal Core](https://nominal.io/products/core).
3
4The library aims to balance three concerns:
5
61. Data should exist in-memory only for a limited, configurable amount of time before it's sent to Core.
71. Writes should fall back to disk if there are network failures.
81. Backpressure should be applied to incoming requests when network throughput is saturated.
9
10This library streams data to Nominal Core, to a file, or to Nominal Core with a file as backup (recommended to protect against network failures).
11It also provides configuration to manage the tradeoff between above listed concerns.
12
13<div class="warning">
14This library is still under active development and may make breaking changes.
15</div>
16
17## Conceptual overview
18
19Data is sent to a [Stream](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalDatasetStream.html) via a Writer.
20For example:
21
22- A file stream is constructed as:
23
24  ```rust
25  use nominal_streaming::stream::NominalDatasetStreamBuilder;
26
27  let stream = NominalDatasetStreamBuilder::new()
28      .stream_to_file("my_data.avro")
29      .build();
30  ```
31
32- A stream that sends data to Nominal Core, but writes failed requests to a file, is created as follows:
33
34  ```rust,ignore
35  let stream = NominalDatasetStreamBuilder::new()
36      .stream_to_core(token, dataset_rid, handle)
37      .with_file_fallback("fallback.avro")
38      .build();
39  ```
40
41- Or, you can build a stream that sends data to Nominal Core *and* to a file:
42
43  ```rust,ignore
44  let stream = NominalDatasetStreamBuilder::new()
45      .stream_to_core(token, dataset_rid, handle)
46      .stream_to_file("my_data.avro")
47      .build();
48  ```
49
50(See below for a [full example](#example-streaming-from-memory-to-nominal-core-with-file-fallback), that also shows how to create the `token`, `dataset_rid`, and `handle` values above.)
51
52Once we have a Stream, we can construct a Writer and send values to it:
53
54```rust,ignore
55let channel_descriptor = ChannelDescriptor::with_tags(
56    "channel_1", [("experiment_id", "123")]
57);
58
59let mut writer = stream.double_writer(&channel_descriptor);
60
61// Stream single data point
62let start_time = UNIX_EPOCH.elapsed().unwrap();
63let value: f64 = 123;
64writer.push(start_time, value);
65```
66
67Here, we are enquing data onto Channel 1, with tags "name" and "batch".
68These are, of course, just examples, and you can choose your own.
69
70## Example: streaming from memory to Nominal Core, with file fallback
71
72This is the typical scenario where we want to stream some values from memory into a [Nominal Dataset](https://docs.nominal.io/core/sdk/python-client/streaming/overview#streaming-data-to-a-dataset).
73If the upload fails (say because of network errors), we'd like to instead send the data to an AVRO file.
74
75Note that we set up the async [Tokio runtime](https://tokio.rs/), since that is required by the underlying [`NominalCoreConsumer`](https://docs.rs/nominal-streaming/latest/nominal_streaming/consumer/struct.NominalCoreConsumer.html).
76
77```rust,no_run
78use nominal_streaming::prelude::*;
79use nominal_streaming::stream::NominalDatasetStreamBuilder;
80
81use std::time::UNIX_EPOCH;
82
83
84static DATASET_RID: &str = "ri.catalog....";  // your dataset ID here
85
86
87fn main() {
88    // The NominalCoreConsumer requires a tokio runtime
89    tokio::runtime::Builder::new_multi_thread()
90        .enable_all()
91        .worker_threads(4)
92        .thread_name("tokio")
93        .build()
94        .expect("Failed to create Tokio runtime")
95        .block_on(async_main());
96}
97
98
99async fn async_main() {
100    // Configure token for authentication
101    let token = BearerToken::new(
102        std::env::var("NOMINAL_TOKEN")
103            .expect("NOMINAL_TOKEN environment variable not set")
104            .as_str(),
105    )
106    .expect("Invalid token");
107
108    let dataset_rid = ResourceIdentifier::new(DATASET_RID).unwrap();
109    let handle = tokio::runtime::Handle::current();
110
111    let stream = NominalDatasetStreamBuilder::new()
112        .stream_to_core(token, dataset_rid, handle)
113        .with_file_fallback("fallback.avro")
114        .build();
115
116    let channel_descriptor = ChannelDescriptor::with_tags("channel_1", [("experiment_id", "123")]);
117
118    let mut writer = stream.double_writer(&channel_descriptor);
119
120    // Generate and upload 100,000 data points
121    for i in 0..100_000 {
122        let start_time = UNIX_EPOCH.elapsed().unwrap();
123        let value = i % 50;
124        writer.push(start_time, value as f64);
125    }
126}
127```
128
129## Additional configuration
130
131### Stream options
132
133Above, you saw an example using [`NominalStreamOpts::default`](https://docs.rs/nominal-streaming/latest/nominal_streaming/stream/struct.NominalStreamOpts.html).
134The following stream options can be set using `.with_options(...)` on the StreamBuilder:
135
136```text
137NominalStreamOpts {
138  max_points_per_record: usize,
139  max_request_delay: Duration,
140  max_buffered_requests: usize,
141  request_dispatcher_tasks: usize,
142}
143```
144
145### Logging errors
146
147Most of the time, when things go wrong, we want some form of reporting. You can enable debug logging on the StreamBuilder by using `.enable_logging()`:
148
149```rust,ignore
150let stream = NominalDatasetStreamBuilder::new()
151    .stream_to_core(token, dataset_rid, handle)
152    .with_file_fallback("fallback.avro")
153    .enable_logging()
154    .build();
155```
156*/
157
158pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162mod types;
163pub mod upload;
164
165/// This includes the most common types in this crate, re-exported for your convenience.
166pub mod prelude {
167    pub use conjure_object::BearerToken;
168    pub use conjure_object::ResourceIdentifier;
169    pub use nominal_api::tonic::google::protobuf::Timestamp;
170    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
171    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
172    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
173    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
174    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
175    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
176    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
177    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
178    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
179
180    pub use crate::consumer::NominalCoreConsumer;
181    pub use crate::stream::NominalDatasetStream;
182    #[expect(deprecated)]
183    pub use crate::stream::NominalDatasourceStream;
184    pub use crate::stream::NominalStreamOpts;
185    pub use crate::types::AuthProvider;
186    pub use crate::types::ChannelDescriptor;
187}
188
189#[cfg(test)]
190mod tests {
191    use std::collections::HashMap;
192    use std::sync::Arc;
193    use std::sync::Mutex;
194    use std::thread;
195    use std::time::Duration;
196    use std::time::UNIX_EPOCH;
197
198    use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
199
200    use crate::client::PRODUCTION_API_URL;
201    use crate::consumer::ConsumerResult;
202    use crate::consumer::WriteRequestConsumer;
203    use crate::prelude::*;
204    use crate::types::IntoTimestamp;
205
206    #[derive(Debug)]
207    struct TestDatasourceStream {
208        requests: Mutex<Vec<WriteRequestNominal>>,
209    }
210
211    impl WriteRequestConsumer for Arc<TestDatasourceStream> {
212        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
213            self.requests.lock().unwrap().push(request.clone());
214            Ok(())
215        }
216    }
217
218    fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
219        let test_consumer = Arc::new(TestDatasourceStream {
220            requests: Mutex::new(vec![]),
221        });
222        let stream = NominalDatasetStream::new_with_consumer(
223            test_consumer.clone(),
224            NominalStreamOpts {
225                max_points_per_record: 1000,
226                max_request_delay: Duration::from_millis(100),
227                max_buffered_requests: 2,
228                request_dispatcher_tasks: 4,
229                base_api_url: PRODUCTION_API_URL.to_string(),
230            },
231        );
232
233        (test_consumer, stream)
234    }
235
236    #[test]
237    fn test_stream() {
238        let (test_consumer, stream) = create_test_stream();
239
240        for batch in 0..5 {
241            let mut points = Vec::new();
242            for i in 0..1000 {
243                let start_time = UNIX_EPOCH.elapsed().unwrap();
244                points.push(DoublePoint {
245                    timestamp: Some(Timestamp {
246                        seconds: start_time.as_secs() as i64,
247                        nanos: start_time.subsec_nanos() as i32 + i,
248                    }),
249                    value: (i % 50) as f64,
250                });
251            }
252
253            stream.enqueue(
254                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
255                points,
256            );
257        }
258
259        drop(stream); // wait for points to flush
260
261        let requests = test_consumer.requests.lock().unwrap();
262
263        // validate that the requests were flushed based on the max_records value, not the
264        // max request delay
265        assert_eq!(requests.len(), 5);
266        let series = requests.first().unwrap().series.first().unwrap();
267        if let Some(PointsType::DoublePoints(points)) =
268            series.points.as_ref().unwrap().points_type.as_ref()
269        {
270            assert_eq!(points.points.len(), 1000);
271        } else {
272            panic!("unexpected data type");
273        }
274    }
275
276    #[test]
277    fn test_stream_types() {
278        let (test_consumer, stream) = create_test_stream();
279
280        for batch in 0..5 {
281            let mut doubles = Vec::new();
282            let mut strings = Vec::new();
283            let mut ints = Vec::new();
284            for i in 0..1000 {
285                let start_time = UNIX_EPOCH.elapsed().unwrap();
286                doubles.push(DoublePoint {
287                    timestamp: Some(start_time.into_timestamp()),
288                    value: (i % 50) as f64,
289                });
290                strings.push(StringPoint {
291                    timestamp: Some(start_time.into_timestamp()),
292                    value: format!("{}", i % 50),
293                });
294                ints.push(IntegerPoint {
295                    timestamp: Some(start_time.into_timestamp()),
296                    value: i % 50,
297                })
298            }
299
300            stream.enqueue(
301                &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
302                doubles,
303            );
304            stream.enqueue(
305                &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
306                strings,
307            );
308            stream.enqueue(
309                &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
310                ints,
311            );
312        }
313
314        drop(stream); // wait for points to flush
315
316        let requests = test_consumer.requests.lock().unwrap();
317
318        // validate that the requests were flushed based on the max_records value, not the
319        // max request delay
320        assert_eq!(requests.len(), 15);
321
322        let r = requests
323            .iter()
324            .flat_map(|r| r.series.clone())
325            .map(|s| {
326                (
327                    s.channel.unwrap().name,
328                    s.points.unwrap().points_type.unwrap(),
329                )
330            })
331            .collect::<HashMap<_, _>>();
332        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
333            panic!("invalid double points type");
334        };
335
336        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
337            panic!("invalid int points type");
338        };
339
340        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
341            panic!("invalid string points type");
342        };
343
344        // collect() overwrites into a single request
345        assert_eq!(dp.points.len(), 1000);
346        assert_eq!(sp.points.len(), 1000);
347        assert_eq!(ip.points.len(), 1000);
348    }
349
350    #[test_log::test]
351    fn test_writer() {
352        let (test_consumer, stream) = create_test_stream();
353
354        let cd = ChannelDescriptor::new("channel_1");
355        let mut writer = stream.double_writer(&cd);
356
357        for i in 0..5000 {
358            let start_time = UNIX_EPOCH.elapsed().unwrap();
359            let value = i % 50;
360            writer.push(start_time, value as f64);
361        }
362
363        drop(writer); // flush points to stream
364        drop(stream); // flush stream to nominal
365
366        let requests = test_consumer.requests.lock().unwrap();
367
368        assert_eq!(requests.len(), 5);
369        let series = requests.first().unwrap().series.first().unwrap();
370        if let Some(PointsType::DoublePoints(points)) =
371            series.points.as_ref().unwrap().points_type.as_ref()
372        {
373            assert_eq!(points.points.len(), 1000);
374        } else {
375            panic!("unexpected data type");
376        }
377    }
378
379    #[test_log::test]
380    fn test_time_flush() {
381        let (test_consumer, stream) = create_test_stream();
382
383        let cd = ChannelDescriptor::new("channel_1");
384        let mut writer = stream.double_writer(&cd);
385
386        writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
387        thread::sleep(Duration::from_millis(101));
388        writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); // first flush
389        thread::sleep(Duration::from_millis(101));
390        writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); // second flush
391
392        drop(writer);
393        drop(stream);
394
395        let requests = test_consumer.requests.lock().unwrap();
396        dbg!(&requests);
397        assert_eq!(requests.len(), 2);
398    }
399
400    #[test_log::test]
401    fn test_writer_types() {
402        let (test_consumer, stream) = create_test_stream();
403
404        let cd1 = ChannelDescriptor::new("double");
405        let cd2 = ChannelDescriptor::new("string");
406        let cd3 = ChannelDescriptor::new("int");
407        let mut double_writer = stream.double_writer(&cd1);
408        let mut string_writer = stream.string_writer(&cd2);
409        let mut integer_writer = stream.integer_writer(&cd3);
410
411        for i in 0..5000 {
412            let start_time = UNIX_EPOCH.elapsed().unwrap();
413            let value = i % 50;
414            double_writer.push(start_time, value as f64);
415            string_writer.push(start_time, format!("{}", value));
416            integer_writer.push(start_time, value);
417        }
418
419        drop(double_writer);
420        drop(string_writer);
421        drop(integer_writer);
422        drop(stream);
423
424        let requests = test_consumer.requests.lock().unwrap();
425
426        assert_eq!(requests.len(), 15);
427
428        let r = requests
429            .iter()
430            .flat_map(|r| r.series.clone())
431            .map(|s| {
432                (
433                    s.channel.unwrap().name,
434                    s.points.unwrap().points_type.unwrap(),
435                )
436            })
437            .collect::<HashMap<_, _>>();
438
439        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
440            panic!("invalid double points type");
441        };
442
443        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
444            panic!("invalid int points type");
445        };
446
447        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
448            panic!("invalid string points type");
449        };
450
451        // collect() overwrites into a single request
452        assert_eq!(dp.points.len(), 1000);
453        assert_eq!(sp.points.len(), 1000);
454        assert_eq!(ip.points.len(), 1000);
455    }
456}