Nominal Streaming
nominal-streaming is a Rust library for streaming data into Nominal Core.
The library aims to balance three concerns:
- Data should exist in-memory only for a limited, configurable amount of time before it's sent to Core.
- Writes should fall back to disk if there are network failures.
- Backpressure should be applied to incoming requests when network throughput is saturated.
This library streams data to Nominal Core, to a file, or to Nominal Core with a file as backup (recommended to protect against network failures). It also provides configuration to manage the tradeoff between above listed concerns.
[!WARNING] This library is still under active development and may make breaking changes.
You can view the crate documentation at https://docs.rs/nominal-streaming/latest/nominal_streaming/
Conceptual overview
Data is sent to a Stream via a Writer. For example:
-
A file stream is constructed as:
let stream = new .stream_to_file .build; -
A stream that sends data to Nominal Core, but writes failed requests to a file, is created as follows:
let stream = new .stream_to_core .with_file_fallback .build; -
Or, you can build a stream that sends data to Nominal Core and to a file:
let stream = new .stream_to_core .stream_to_file .build;
(See below for a full example, that also shows how to create the token, dataset_rid, and handle values above.)
Once we have a Stream, we can construct a Writer and send values to it:
let channel_descriptor = with_tags;
let mut writer = stream.double_writer;
// Stream single data point
let start_time = UNIX_EPOCH.elapsed.unwrap;
let value: f64 = 123;
writer.push;
}
Here, we are enquing data onto Channel 1, with tags "name" and "batch". These are, of course, just examples, and you can choose your own.
Full example: streaming from memory to Nominal Core, with file fallback
This is the typical scenario where we want to stream some values from memory into a Nominal Dataset. If the upload fails (say because of network errors), we'd like to instead send the data to an AVRO file.
Note that we set up the async Tokio runtime, since that is required by the underlying NominalCoreConsumer.
use *;
use UNIX_EPOCH;
static DATASET_RID: &str = "ri.catalog...."; // your dataset ID here
async
Additional configuration
Stream options
Above, you saw an example using NominalStreamOpts::default.
The following stream options can be set using .with_options(...) on the StreamBuilder:
NominalStreamOpts
Logging errors
Most of the time, when things go wrong, we want some form of reporting. You can enable debug logging on the StreamBuilder using .enable_logging():
let stream = new
.stream_to_core
.with_file_fallback
.enable_logging
.build;