nominal-streaming 0.4.0

Library for durable, low-latency streaming into Nominal Core
Documentation

Nominal Streaming

nominal-streaming is a Rust library for streaming data into Nominal Core.

The library aims to balance three concerns:

  1. Data should exist in-memory only for a limited, configurable amount of time before it's sent to Core.
  2. Writes should fall back to disk if there are network failures.
  3. Backpressure should be applied to incoming requests when network throughput is saturated.

The library provides configuration points to manage the tradeoff between these concerns.

[!WARNING] This library is still under active development and may make breaking changes.

You can view the crate documentation at https://docs.rs/nominal-streaming/latest/nominal_streaming/

Conceptual overview

Data points will be sent to a Consumer. The Consumer is responsible for, e.g., sending the data to a dataset in Nominal Core, or for saving it to disk. A NominalDatasetStream is the mechanism by which data points are fed to the consumer.

We construct a stream from a consumer as follows:

use nominal_streaming::consumer::AvroFileConsumer;

let avro_consumer = AvroFileConsumer::new_with_full_path("/tmp/my_stream.avro").expect("Could not open Avro file");
let stream = NominalDatasetStream::new_with_consumer(avro_consumer, NominalStreamOpts::default());

Recall that the consumer takes the data points, and sends it somewhere—in this case, into an Avro file. We can now push data onto the stream:

let mut points = Vec::new();

// ... add data onto points ...
points.push(DoublePoint {
    timestamp: Timestamp {
      seconds: 0,
      nanos: 0
    },
    value: 123.45
});

// Stream to Avro file
stream.enqueue(
    &ChannelDescriptor::with_tags("channel_1", [("name", "my stream"), ("batch", "1")]),
    points,
);

Note that we are enquing our data onto Channel 1, with tags "name" and "batch". These are just examples, you can choose your own.

Stream options

Above, you saw an example using NominalStreamOpts::default. The following stream options can be set:

NominalStreamOpts {
  max_points_per_record: usize,
  max_request_delay: Duration,
  max_buffered_requests: usize,
  request_dispatcher_tasks: usize,
}

Full example: streaming from memory to Nominal Core

In this simplest case, we want to stream some values from memory into a Nominal Dataset.

Note that the NominalCoreConsumer requires the async Tokio runtime.

use nominal_streaming::prelude::*;
use std::time::UNIX_EPOCH;


static DATASET_RID: &str = "ri.catalog....";  // your dataset ID here


fn core_consumer() -> NominalCoreConsumer<BearerToken> {
    let token = BearerToken::new(
        std::env::var("NOMINAL_TOKEN")
            .expect("NOMINAL_TOKEN environment variable not set")
            .as_str(),
    )
    .expect("Invalid token");

    let dataset_rid = ResourceIdentifier::new(DATASET_RID).unwrap();

    NominalCoreConsumer::new(
        STAGING_STREAMING_CLIENT.clone(),
        tokio::runtime::Handle::current(),
        token.clone(),
        dataset_rid.clone(),
    )
}


fn main() {
    // The NominalCoreConsumer requires a tokio runtime
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .worker_threads(4)
        .thread_name("tokio")
        .build()
        .expect("Failed to create Tokio runtime")
        .block_on(async_main());
}


async fn async_main() {
    let stream = NominalDatasetStream::new_with_consumer(
        core_consumer(),
        NominalStreamOpts::default()
    );

    // Generate 50 batches of test data, each containing 100,000 data points
    for batch in 0..50 {
        let mut points = Vec::new();

        for i in 0..100_000 {
            let start_time = UNIX_EPOCH.elapsed().unwrap();
            points.push(DoublePoint {
                timestamp: Some(Timestamp {
                    seconds: start_time.as_secs() as i64,
                    nanos: start_time.subsec_nanos() as i32 + i,
                }),
                value: (i % 50) as f64,
            });
        }

        // Push current batch onto the upload queue
        println!("Enqueue batch: {}", batch);
        stream.enqueue(
            &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
            points,
        );
    }
}

The Cargo.toml will contain the following dependencies:

[dependencies]
nominal-api = "0.867.0"
nominal-streaming = "0.2.0"
tokio = { version = "1", features = ["full", "tracing"] }

Streaming with fallback

Often, it is imperative that we capture data values even when a network connection is interrupted. For that purpose, the library has support for a fallback, so that it attempts to write to a secondary consumer if the first one fails:

use nominal_streaming::consumer::RequestConsumerWithFallback;

let stream = NominalDatasetStream::new_with_consumer(
    RequestConsumerWithFallback::new(core_consumer(), avro_consumer),
    NominalStreamOpts::default(),
);

Similarly, you can use DualWriteRequestConsumer to send data to two consumers simultaneously.

Logging errors

Most of the time, when things go wrong, we also want some form of reporting. That is the purpose of the ListeningWriteRequestConsumer:

use nominal_streaming::consumer::ListeningWriteRequestConsumer;
use nominal_streaming::notifier::LoggingListener;
use std::sync::Arc;

let consumer_with_logging = ListeningWriteRequestConsumer::new(
    core_consumer(),
    vec![Arc::new(LoggingListener)]
);

You'll also need to enable tracing in main:

use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

tracing_subscriber::registry()
.with(
    tracing_subscriber::fmt::layer()
        .with_thread_ids(true)
        .with_thread_names(true)
        .with_line_number(true),
)
.with(
    EnvFilter::builder()
        .with_default_directive(LevelFilter::DEBUG.into())
        .from_env_lossy()
)
.init();

And add the necessary tracing dependencies to Cargo.toml:

tracing = "^0.1"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

If you want to avoid printing full tracebacks for errors, customize the error printing:

use nominal_streaming::notifier::NominalStreamListener;
use std::error::Error;
use tracing::error;

#[derive(Debug, Default, Clone)]
pub struct MyListener;

impl NominalStreamListener for MyListener {
    fn on_error(&self, message: &str, _error: &dyn Error) {
        error!("{}", message);
    }
}

let stream = ListeningWriteRequestConsumer::new(core_consumer(), vec![Arc::new(MyListener)]);

The builder interface

The latest version of nominal-streaming contains a builder interface to make all of the above simpler.

E.g., you can now do:

use nominal_streaming::stream::NominalDatasetStreamBuilder;

let handle = tokio::runtime::Handle::current();
let dataset_rid = ResourceIdentifier::new(DATASET_RID).unwrap();

let stream = NominalDatasetStreamBuilder::new()
  .stream_to_core(token, dataset_rid, handle)
  .with_file_fallback("/tmp/fallback.avro")
  .build();