Skip to main content

Crate nominal_streaming

Crate nominal_streaming 

Source
Expand description

nominal-streaming is a crate for streaming data into Nominal Core.

The library aims to balance three concerns:

  1. Data should exist in-memory only for a limited, configurable amount of time before it’s sent to Core.
  2. Writes should fall back to disk if there are network failures.
  3. Backpressure should be applied to incoming requests when network throughput is saturated.

This library streams data to Nominal Core, to a file, or to Nominal Core with a file as backup (recommended to protect against network failures). It also provides configuration to manage the tradeoff between above listed concerns.

This library is still under active development and may make breaking changes.

§Conceptual overview

Data is sent to a Stream via a Writer. For example:

  • A file stream is constructed as:

    use nominal_streaming::stream::NominalDatasetStreamBuilder;
    
    let stream = NominalDatasetStreamBuilder::new()
        .stream_to_file("my_data.avro")
        .build();
  • A stream that sends data to Nominal Core, but writes failed requests to a file, is created as follows:

    let stream = NominalDatasetStreamBuilder::new()
        .stream_to_core(token, dataset_rid, handle)
        .with_file_fallback("fallback.avro")
        .build();
  • Or, you can build a stream that sends data to Nominal Core and to a file:

    let stream = NominalDatasetStreamBuilder::new()
        .stream_to_core(token, dataset_rid, handle)
        .stream_to_file("my_data.avro")
        .build();

(See below for a full example, that also shows how to create the token, dataset_rid, and handle values above.)

Once we have a Stream, we can construct a Writer and send values to it:

let channel_descriptor = ChannelDescriptor::with_tags(
    "channel_1", [("experiment_id", "123")]
);

let mut writer = stream.double_writer(&channel_descriptor);

// Stream single data point
let start_time = UNIX_EPOCH.elapsed().unwrap();
let value: f64 = 123;
writer.push(start_time, value);

Here, we are enqueuing data onto Channel 1, with tags “name” and “batch”. These are, of course, just examples, and you can choose your own.

§Example: streaming from memory to Nominal Core, with file fallback

This is the typical scenario where we want to stream some values from memory into a Nominal Dataset. If the upload fails (say because of network errors), we’d like to instead send the data to an Avro file. Note that the Avro spec does not support uint64 values, so those will be stored as signed int64 values.

Note that we set up the async Tokio runtime, since that is required by the underlying NominalCoreConsumer.

use nominal_streaming::prelude::*;
use nominal_streaming::stream::NominalDatasetStreamBuilder;

use std::time::UNIX_EPOCH;


static DATASET_RID: &str = "ri.catalog....";  // your dataset ID here


fn main() {
    // The NominalCoreConsumer requires a tokio runtime
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .worker_threads(4)
        .thread_name("tokio")
        .build()
        .expect("Failed to create Tokio runtime")
        .block_on(async_main());
}


async fn async_main() {
    // Configure token for authentication
    let token = BearerToken::new(
        std::env::var("NOMINAL_TOKEN")
            .expect("NOMINAL_TOKEN environment variable not set")
            .as_str(),
    )
    .expect("Invalid token");

    let dataset_rid = ResourceIdentifier::new(DATASET_RID).unwrap();
    let handle = tokio::runtime::Handle::current();

    let stream = NominalDatasetStreamBuilder::new()
        .stream_to_core(token, dataset_rid, handle)
        .with_file_fallback("fallback.avro")
        .build();

    let channel_descriptor = ChannelDescriptor::with_tags("channel_1", [("experiment_id", "123")]);

    let mut writer = stream.double_writer(&channel_descriptor);

    // Generate and upload 100,000 data points
    for i in 0..100_000 {
        let start_time = UNIX_EPOCH.elapsed().unwrap();
        let value = i % 50;
        writer.push(start_time, value as f64);
    }
}

§Additional configuration

§Stream options

Above, you saw an example using NominalStreamOpts::default. The following stream options can be set using .with_options(...) on the StreamBuilder:

NominalStreamOpts {
  max_points_per_record: usize,
  max_request_delay: Duration,
  max_buffered_requests: usize,
  request_dispatcher_tasks: usize,
}

§Logging errors

Most of the time, when things go wrong, we want some form of reporting. You can enable debug logging on the StreamBuilder by using .enable_logging():

let stream = NominalDatasetStreamBuilder::new()
    .stream_to_core(token, dataset_rid, handle)
    .with_file_fallback("fallback.avro")
    .enable_logging()
    .build();

Re-exports§

pub use nominal_api as api;

Modules§

client
consumer
listener
prelude
This includes the most common types in this crate, re-exported for your convenience.
stream
types
upload