Nominal Streaming
nominal-streaming is a Rust library for streaming data into Nominal Core.
The library aims to balance three concerns:
- Data should exist in-memory only for a limited, configurable amount of time before it's sent to Core.
- Writes should fall back to disk if there are network failures.
- Backpressure should be applied to incoming requests when network throughput is saturated.
The library provides configuration points to manage the tradeoff between these concerns.
[!WARNING] This library is still under active development and may make breaking changes.
You can view the crate documentation at https://docs.rs/nominal-streaming/latest/nominal_streaming/
Conceptual overview
Data points will be sent to a Consumer.
The Consumer is responsible for, e.g., sending the data to a dataset in Nominal Core, or for saving it to disk.
A NominalDatasetStream is the mechanism by which data points are fed to the consumer.
We construct a stream from a consumer as follows:
use AvroFileConsumer;
let avro_consumer = new_with_full_path.expect;
let stream = new_with_consumer;
Recall that the consumer takes the data points, and sends it somewhere—in this case, into an Avro file. We can now push data onto the stream:
let mut points = Vecnew;
// ... add data onto points ...
points.push;
// Stream to Avro file
stream.enqueue;
Note that we are enquing our data onto Channel 1, with tags "name" and "batch". These are just examples, you can choose your own.
Stream options
Above, you saw an example using NominalStreamOpts::default. The
following stream options can be set:
NominalStreamOpts
Full example: streaming from memory to Nominal Core
In this simplest case, we want to stream some values from memory into a Nominal Dataset.
Note that the NominalCoreConsumer requires the async Tokio runtime.
use *;
use UNIX_EPOCH;
static DATASET_RID: &str = "ri.catalog...."; // your dataset ID here
async
The Cargo.toml will contain the following dependencies:
[]
= "0.867.0"
= "0.2.0"
= { = "1", = ["full", "tracing"] }
Streaming with fallback
Often, it is imperative that we capture data values even when a network connection is interrupted. For that purpose, the library has support for a fallback, so that it attempts to write to a secondary consumer if the first one fails:
use RequestConsumerWithFallback;
let stream = new_with_consumer;
Similarly, you can use DualWriteRequestConsumer to send data to two consumers simultaneously.
Logging errors
Most of the time, when things go wrong, we also want some form of reporting.
That is the purpose of the ListeningWriteRequestConsumer:
use ListeningWriteRequestConsumer;
use LoggingListener;
use Arc;
let consumer_with_logging = new;
You'll also need to enable tracing in main:
use LevelFilter;
use EnvFilter;
use SubscriberExt;
use SubscriberInitExt;
registry
.with
.with
.init;
And add the necessary tracing dependencies to Cargo.toml:
= "^0.1"
= { = "0.3.19", = ["env-filter"] }
If you want to avoid printing full tracebacks for errors, customize the error printing:
use NominalStreamListener;
use Error;
use error;
;
let stream = new;
The builder interface
The latest version of nominal-streaming contains a builder interface to make all of the above simpler.
E.g., you can now do:
use NominalDatasetStreamBuilder;
let handle = current;
let dataset_rid = new.unwrap;
let stream = new
.stream_to_core
.with_file_fallback
.build;