Crate wingfoil

Crate wingfoil 

Source
Expand description

§Wingfoil

Wingfoil is a blazingly fast, highly scalable stream processing framework designed for latency-critical use cases such as electronic trading and real-time AI systems.

Wingfoil is a lingua franca of stream processing, making it easy to receive, process, and distribute streaming data.

§Features

  • Fast: Ultra-low latency and high throughput with a efficent DAG based execution engine.
  • Simple and obvious to use: Define your graph of calculations; Wingfoil manages it’s execution.
  • Backtesting: Replay historical data to backtest and optimise strategies.
  • Async/Tokio: seemless integration, allows you to leverage async at your graph edges.
  • Multi-threading: distribute graph execution across cores.

§Quick Start

use wingfoil::*;
use std::time::Duration;
fn main() {
    let period = Duration::from_secs(1);
    ticker(period)
        .count()
        .map(|i| format!("hello, world {:}", i))
        .print()
        .run(RunMode::RealTime, RunFor::Duration(period*3)
    );
}

This output is produced:

hello, world 1
hello, world 2
hello, world 3

§Graph Execution

Wingfoil abstracts away the details of how to co-ordinate the calculation of your application, parts of which may executing at different frequencies. Only the nodes that actually require cycling are executed which allows wingfoil to efficiently scale to very large graphs. Consider the following example:

use wingfoil::*;
use std::time::Duration;
 
fn main() {
    let period = Duration::from_millis(10);
    let source = ticker(period).count(); // 1, 2, 3 etc
    let is_even = source.map(|i| i % 2 == 0);
    let odds = source
        .filter(is_even.not())
        .map(|i| format!("{:} is odd", i));
    let evens = source
        .filter(is_even)
        .map(|i| format!("{:} is even", i));
    merge(vec![odds, evens])
        .print()
        .run(
            RunMode::HistoricalFrom(NanoTime::ZERO),
            RunFor::Duration(period * 5),
        );
}

This output is produced:

1 is odd
2 is even
3 is odd
4 is even
5 is odd
6 is even

We can visualise the graph like this:

diagram
The input and output nodes tick 6 times each and the evens and odds nodes tick 3 times.

§Historical vs RealTime

Time is a first-class citizen in wingfoil. Engine time is measured in nanoseconds from the UNIX epoch and represented by a NanoTime.

In this example we compare and contrast RealTime vs Historical RunMode. RealTime is used for production deployment. Historical is used for development, unit-testing, integration-testing and back-testing.

use wingfoil::*;
use std::time::Duration;
use log::Level::Info;

pub fn main() {
    env_logger::init();
    for run_mode in vec![
        RunMode::RealTime,
        RunMode::HistoricalFrom(NanoTime::ZERO)
    ] {
        println!("\nUsing RunMode::{:?}", run_mode);
        ticker(Duration::from_secs(1))
            .count()
            .logged("tick", Info)
            .run(run_mode, RunFor::Cycles(3));
    }
}

This output is produced:

Using RunMode::RealTime
[17:34:46Z INFO wingfoil] 0.000_001 tick 1
[17:34:47Z INFO wingfoil] 1.000_131 tick 2
[17:34:48Z INFO wingfoil] 2.000_381 tick 3

Using RunMode::HistoricalFrom(NanoTime(0))
[17:34:48Z INFO  wingfoil] 0.000_000 tick 1
[17:34:48Z INFO  wingfoil] 1.000_000 tick 2
[17:34:48Z INFO  wingfoil] 2.000_000 tick 3

In Realtime mode the log statements are written every second. In Historical mode the log statements are written immediately. In both cases engine time advances by 1 second between each tick.

§Order Book Example

In this example, we load a CSV file of limit orders for AAPL stock ticker trading on the NASDAQ exchange. The data is sourced from lobsterdata samples

We use the coincidentally named lobster rust library to maintain an order book over time.

Trades and two-way prices are derived, exported back out to CSV and plotted below.

The frequencies of the inputs and outputs are all different to each other.

diagram

In addition to the csv output, we also get a performance summary and pretty-print of the graph. One hours worth of data was processed in 287 milliseconds.

8 nodes wired in 10.326µs
Completed 91998 cycles in 287.125397ms. 3.12µs average.
[00] CsvReaderStream
[01]   FilterStream
[02]      MapStream
[03]        MapStream
[04]          DistinctStream
[05]            CsvWriterNode
[06]        MapStream
[07]          CsvVecWriterNode

use std::cell::RefCell;
use wingfoil::adapters::csv_streams::*;
use wingfoil::{Graph, NanoTime, RunFor, RunMode, StreamOperators, TupleStreamOperators};



pub fn main() {
    env_logger::init();
    let book = RefCell::new(lobster::OrderBook::default());
    let source_path = "examples/lobster/data/aapl.csv";
    let fills_path = "examples/lobster/data/fills.csv";
    let prices_path = "examples/lobster/data/prices.csv";
    // map from seconds from midnight to NanoTime time
    let get_time = |msg: &Message| NanoTime::new((msg.seconds * 1e9) as u64);
    let (fills, prices) = csv_read_vec(source_path, get_time, true)
        .map(move |chunk| process_orders(chunk, &book))
        .split();
    let prices_export = prices
        .filter_value(|price: &Option<TwoWayPrice>| !price.is_none())
        .map(|price| price.unwrap())
        .distinct()
        .csv_write(prices_path);
    let fills_export = fills.csv_write_vec(fills_path);
    let run_mode = RunMode::HistoricalFrom(NanoTime::ZERO);
    let run_for = RunFor::Forever;
    Graph::new(vec![prices_export, fills_export], run_mode, run_for)
        .print()
        .run()
        .unwrap();
}

See the order book example for more details.

§async / tokio integration

In this example, we demonstrate wingfoil’s integration with tokio / async. This makes building IO adapters at the the graph edge a breeze, giving the best of sync and async worlds.

Async streams are a useful abstraction for IO but are not as powerful or as easy to use for implementing complex business logic as wingfoil. Async uses an implicit, depth first graph execution strategy. In contrast wingfoil’s explicit, breadth first graph execution algorithm, is easier to reason about, encourages re-use and provides a more structured and productive development environment. Wingfoils explicit modelling of time with support for historical and realtime modes, is also huge win in terms of productivity and ability to backtest strategies over async streams.

The key methods here are produce_async which maps an async futures stream into a wingfoil stream and consume_async which takes a wingfoil stream and consumes it with a supplied async closure.

This hybrid sync-async approach enforces clear seperation between IO and business logic, which can often be problematic in async oriented systems.

use async_stream::stream;
use std::time::Duration;
use std::pin::Pin;
use futures::StreamExt;
use wingfoil::*;

let period = Duration::from_millis(10);
let run_for = RunFor::Duration(period * 5);
let run_mode = RunMode::RealTime;
let producer = async move || {
    stream! {
        for i in 0.. {
            tokio::time::sleep(period).await; // simulate waiting IO
            let time = NanoTime::now();
            yield (time, i * 10);
        }
    }
};

let consumer = async move |mut source: Pin<Box<dyn FutStream<u32>>>| {
    while let Some((time, value)) = source.next().await {
        println!("{time:?}, {value:?}");
    }
};

produce_async(producer)
    .logged("on-graph", log::Level::Info)
    .collapse()
    .consume_async(Box::new(consumer))
    .run(run_mode, run_for);

See the async example for more details.

§Graph Execution

In this example we illustrate the power of wingfoil’s breadth-first graph execution algorithm and compare with ReactiveX’s depth-first approach. Depth first execution is also problematic for async streams.

Also note that wingfoil’s depth first approach, by constsuction, eliminates “reactive glitches” (potential logic defects due to inconsistent intermediate state). See StackOverflow and Wikipedia for more details.

In wingfoil we build an example with a depth of 127 branch / recombine operations:

use wingfoil::*;

fn main(){
    let mut source = constant(1_u128);
    for _ in 0..127 {
        source = add(&source, &source);
    }
    let cycles = source.count();
    cycles.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
        .unwrap();
    println!("cycles {:?}", cycles.peek_value());
    println!("value {:?}", source.peek_value());
}

It produces the correct ouput of 2^127 in 1 engine cycle that takes less than half a millisecond to complete.

384 nodes wired in 212.904µs
Completed 1 cycles in 463.793µs. 463.684µs average.
cycles 1
value 170141183460469231731687303715884105728

If we try and implement the same logic in ReactiveX we quickly realise that it is unfeasible. The issue is that each branch / recombine operation doubles the number of downstream ticks, resulting in an explosive O(2^N) time complexity. Extrapolating from a smaller depths, we can estimate that it would of the order of 10^24 years to run the example with a depth of 127. See the breadth first example for more details.

§RFQ Example

In this example we demonstrate:

  • async / tokio integration
  • demux pattern - to map
  • messaging
  • realtime vs historical execution See the rfq example for more details.

§Multithreading

Wingfoils supports multi-threading to distribute workloads across cores. The approach is to compose sub-graphs, each running in its own dedicated thread.

§Messaging

Wingfoil is agnostic to messaging and we plan to add support for shared memory IPC adapters such as Aeron for ultra-low latency use cases, ZeroMq for low latency server to server communication and Kafka for high latency, fault tolerant messaging.

§Graph Dynamism

Some use cases require graph-dynamism - consider for example Request for Quote (RFQ) markets, where the graph needs to be able to adapt to an incoming RFQ. Conceptually this could be done by changing the shape of the graph at run time. However, we take a simpler approach by virtualising this, following a demux pattern.

§Performance

We take performance seriously, and ongoing work is focused on making Wingfoil even closer to a zero-cost abstraction, Currently, the overhead of cycling a node in the graph is less than 10 nanoseconds.

For best perfomance we recommend using cheaply cloneable types:

  • For small strings: arraystring
  • For small vectors: tinyvec
  • For larger or heap-allocated types:
    • Use Rc<T> for single threaded contexts.
    • Use Arc<T> for multithreaded contexts.

Modules§

adapters
A library of input and output adapters

Structs§

CallBackStream
A queue of values that are emitted at specified times. Useful for unit tests. Can also be used to feed stream output back into the Graph as input on later cycles.
DemuxMap
Maintains map from Key k, to output node for demux in order to demux a source. Used by StreamOperators::demux_it.
Graph
Engine for co-ordinating execution of Nodes
GraphState
Maintains the parts of the graph state that is accessible to Nodes.
NanoTime
A time in nanoseconds since the unix epoch.
Overflow
Represents a Stream of values that failed to be demuxed because the the demux capacity was exceeded. Output of StreamOperators::demux and StreamOperators::demux_it.
UpStreams
The graph can ask a Node what it’s upstreams sources are. The node replies wiht a UpStreams for passive and active sources. All sources are wired upstream. Active nodes trigger Node.cycle() when they tick. Passive Nodes do not.

Enums§

DemuxEvent
A message used to signal that a demuxed child stream can be closed. Used by StreamOperators::demux and StreamOperators::demux_it
RunFor
Defines how long the graph should run for. Can be a Duration, number of cycles or forever.
RunMode
Whether the Graph should run RealTime or Historical mode.

Traits§

AsNode
Used to cast Rc<dyn Stream> to Rc<dyn Node>
AsStream
Used co cast Rc of concrete stream into Rc of dyn Stream.
BenchBuilder
A function that accepts a trigger node and wires downstream logic to be benchmarked.
FutStream
A convenience alias for futures::Stream with items of type (NanoTime, T). used by StreamOperators::consume_async.
IntoNode
Used to consume a concrete MutableNode and return an Rc<dyn Node>>.
IntoStream
Used to consume a concrete Stream and return an Rc<dyn Stream>>.
MutableNode
Implement this trait create your own Node.
Node
A wiring point in the Graph.
NodeOperators
A trait containing operators that can be applied to Nodes. Used to support method chaining syntax.
Stream
A Node which has some state that can peeked at.
StreamOperators
A trait containing operators that can be applied to Streams. Used to support method chaining syntax.
StreamPeek
The trait through which a Streams can current value can be peeked at.
StreamPeekRef
A trait through which a referene to Stream’s value can be peeked at.

Functions§

add
Returns a Stream that adds both it’s source Streams. Ticks when either of it’s sources ticks.
add_bench
Used to add wingfoil bench to criterion.
always
Produces a Node that ticks on every engine cycle.
bimap
Maps two Streams into one using thr supplied function. Ticks when either of it’s sources ticks.
combine
Collects a Vec of Streams into a Stream of Vec.
constant
Returns a stream that ticks once with the specified value, on the first cycle.
merge
Returns a stream that merges it’s sources into one. Ticks when either of it’s sources ticks. If more than one source ticks at the same time, the first one that was supplied is used.
produce_async
Create a Stream from futures::Stream
producer
Creates a Stream emitting values on this thread but produced on a worker thread.
ticker
Returns a Node that ticks with the specified period.