Dataflow
Dataflow is a data processing library, primarily for machine learning. It provides efficient pipeline primitives to build a directed acyclic dataflow graph, and a dataloader to run the graph in a seperate thread. It also provides common tokenizers and batching tools to work with textual data.
Usage
To build a pipeline, first start with a loader Node:
use RandomLoader;
The RandomLoader by default loads individual lines randomly from files. Next add a transformation to it with the add_fn() function:
let pipeline = new
.add_fn;
This creates a new Stateless Node, which just runs the closure on the data. This closure takes in a single datapoint and outputs a single datapoint. If we want to do batch processing, we can use .add_batch_fn()
which takes a closure that can process a batch at a time.
Now we've added "Hello " to every line, let's create a Stateful Node to hold a Tokenizer and make it tokenize our data:
use Stateful;
// Our tokenizer
let tokenizer = load;
// Our pipeline
let pipeline = new
.add_fn
.add_node;
Great! Now our data gets efficiently tokenized as a batch.
Loader Nodes
So far it seems we've only used two types of Nodes, Stateless and Stateful (Stateless was generated when we used .add_fn()). Actually we used three, because RandomLoader is a Node as well! It takes as input Vec<()>, which is what the pipeline will start with, and produces data (Vec) to send through the pipeline.
Custom Nodes
In fact, you can implement your own Nodes as well, by implementing the Node
trait! Just implement fn process(Vec<Input>) -> Vec<Output>
in the trait, and optionally fn reset(&mut)
which gets called at the beginning of an epoch, and fn data_remaining(&self) -> usize
which should return how much data remains availiable to the node (the number of lines we haven't loaded yet for RandomLoader, or usize::MAX for a non-loader Node) and you have your own Node to integrate into the pipeline!
Dataloader
Since we built this cool pipeline, what can we do with it? Well for starters, we could simply call process() and feed in some data, but let's do something cooler. Let's put it in a Dataloader and use it in an ML training loop:
// Make the dataloader
let mut dataloader = Dataloader; // We use 64 as the batch size
// Training loop
for example in &mut dataloader
To Do:
- Make dataloader use a multiqueue instead of draining all examples into buffer on main thread
- Make tokenizer loading more efficient
- Make auto-parallel pipeline Node using rayon
- Clear up any discrepencies between batch and single functions (like in Stateful)