Crate noir_compute

source ·
Expand description

§Noir

Preprint

§Network of Operators In Rust

API Docs

Noir is a distributed data processing platform based on the dataflow paradigm that provides an ergonomic programming interface, similar to that of Apache Flink, but has much better performance characteristics.

Noir converts each job into a dataflow graph of operators and groups them in blocks. Blocks contain a sequence of operors which process the data sequentially without repartitioning it. They are the deployment unit used by the system and can be distributed and executed on multiple systems.

The common layout of a Noir program starts with the creation of a StreamContext, then one or more Sources are initialised creating a Stream. The graph of operators is composed using the methods of the Stream object, which follow a similar approach to Rust’s Iterator trait allowing ergonomically define a processing workflow through method chaining.

§Examples

§Wordcount
use noir_compute::prelude::*;

fn main() {
    // Convenience method to parse deployment config from CLI arguments
    let (config, args) = RuntimeConfig::from_args();
    config.spawn_remote_workers();
    let env = StreamContext::new(config);

    let result = env
        // Open and read file line by line in parallel
        .stream_file(&args[0])
        // Split into words
        .flat_map(|line| tokenize(&line))
        // Partition
        .group_by(|word| word.clone())
        // Count occurrences
        .fold(0, |count, _word| *count += 1)
        // Collect result
        .collect_vec();

    env.execute_blocking(); // Start execution (blocking)
    if let Some(result) = result.get() {
        // Print word counts
        result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
    }
}

fn tokenize(s: &str) -> Vec<String> {
    // Simple tokenisation strategy
    s.split_whitespace().map(str::to_lowercase).collect()
}

// Execute on 6 local hosts `cargo run -- -l 6 input.txt`
§Wordcount associative (faster)
use noir_compute::prelude::*;

fn main() {
    // Convenience method to parse deployment config from CLI arguments
    let (config, args) = RuntimeConfig::from_args();
    let env = StreamContext::new(config);

    let result = env
        .stream_file(&args[0])
        // Adaptive batching(default) has predictable latency
        // Fixed size batching often leads to shorter execution times
        // If data is immediately available and latency is not critical
        .batch_mode(BatchMode::fixed(1024))
        .flat_map(move |line| tokenize(&line))
        .map(|word| (word, 1))
        // Associative operators split the operation in a local and a
        // global step for faster execution
        .group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
        .drop_key()
        .collect_vec();

    env.execute_blocking(); // Start execution (blocking)
    if let Some(result) = result.get() {
        // Print word counts
        result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
    }
}

fn tokenize(s: &str) -> Vec<String> {
    s.split_whitespace().map(str::to_lowercase).collect()
}

// Execute on multiple hosts `cargo run -- -r config.yaml input.txt`

§Remote deployment

# config.yaml
hosts:
  - address: host1.lan
    base_port: 9500
    num_cores: 16
  - address: host2.lan
    base_port: 9500
    num_cores: 8
    ssh:
      username: noir-compute
      key_file: /home/user/.ssh/id_rsa

Refer to the examples directory for an extended set of working examples

Re-exports§

Modules§

  • Configuration types used to initialize the StreamContext.
  • Operators that can be applied to a stream.
  • Re-export of commonly used structs and traits
  • Types that describe the structure of an execution graph. For debugging purposes

Structs§

  • Metadata used to initialize a block at the start of an execution
  • A KeyedStream is like a set of Streams, each of which partitioned by some Key. Internally it’s just a stream whose elements are (K, V) pairs and the operators behave following the KeyedStream semantics.
  • A Stream represents a chain of operators that work on a flow of data. The type of the elements that is leaving the stream is Out.
  • Streaming environment from which it’s possible to register new streams and start the computation.
  • A WindowedStream is a data stream partitioned by Key, where elements of each partition are divided in groups called windows. Each element can be assigned to one or multiple windows.

Enums§

  • Which policy to use for batching the messages before sending them.
  • Replication factor for a block

Functions§

Type Aliases§