wingfoil 4.0.0

graph based stream processing framework
Documentation

CI codecov Crates.io Version Docs.rs PyPI - Version Documentation Status

Wingfoil

Wingfoil is a blazingly fast, highly scalable stream processing framework designed for latency-critical use cases such as electronic trading and real-time AI systems.

Wingfoil simplifies receiving, processing and distributing streaming data across your entire stack.

Features

  • Fast: Ultra-low latency and high throughput with an efficient DAG based execution engine.
  • Simple and obvious to use: Define your graph of calculations; Wingfoil manages its execution.
  • Multi-language: currently available as a Rust crate and as a beta release, python package with plans to add WASM/JavaScript/TypeScript support.
  • Backtesting: Replay historical data to backtest and optimise strategies.
  • Async/Tokio: seamless integration, allows you to leverage async at your graph edges.
  • Multi-threading: distribute graph execution across cores.
  • I/O Adapters: production-ready KDB+ integration for tick data, CSV, etcd key-value store, FIX protocol (FIX 4.4 with TLS), ZeroMQ pub/sub messaging (beta), Prometheus metrics exporter, OpenTelemetry OTLP push, etc.

Quick Start

In this example we build a simple, linear pipeline with all nodes ticking in lock-step.

use wingfoil::*;
use std::time::Duration;
fn main() {
    let period = Duration::from_secs(1);
    ticker(period)
        .count()
        .map(|i| format!("hello, world {:}", i))
        .print()
        .run(RunMode::RealTime, RunFor::Duration(period*3)
    );
}

This output is produced:

hello, world 1
hello, world 2
hello, world 3

Order Book Example

Wingfoil lets you easily wire up complex business logic, splitting and recombining streams, and altering the frequency of data. I/O adapters make it easy to plug in real data sources and sinks. In this example we load a CSV of AAPL limit orders, maintain an order book using the lobster crate, derive trades and two-way prices, and export back to CSV — all in a few lines:

let book = RefCell::new(lobster::OrderBook::default());
let get_time = |msg: &Message| NanoTime::new((msg.seconds * 1e9) as u64);
let (fills, prices) = csv_read("aapl.csv", get_time, true)
    .map(move |chunk| process_orders(chunk, &book))
    .split();
let prices_export = prices
    .filter_value(|price: &Option<TwoWayPrice>| !price.is_none())
    .map(|price| price.unwrap())
    .distinct()
    .csv_write("prices.csv");
let fills_export = fills.csv_write("fills.csv");
Graph::new(vec![prices_export, fills_export], RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
    .print()
    .run()
    .unwrap();

This output is produced:

Full example.

KDB+ Example

Define a typed struct, implement KdbDeserialize to map rows, and stream time-sliced queries directly into your graph:

#[derive(Debug, Clone, Default)]
struct Price {
    sym: Sym,
    mid: f64,
}

kdb_read::<Price, _>(
    conn,
    Duration::from_secs(10),
    |(t0, t1), _date, _iter| {
        format!(
            "select time, sym, mid from prices \
             where time >= (`timestamp$){}j, time < (`timestamp$){}j",
            t0.to_kdb_timestamp(),
            t1.to_kdb_timestamp(),
        )
    },
)
.logged("prices", Info)
.run(
    RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
    RunFor::Duration(Duration::from_secs(100)),
)?;

Full example.

etcd Example

Watch a key prefix, transform values, and write results back — all in a declarative graph:

use wingfoil::adapters::etcd::*;
use wingfoil::*;

let conn = EtcdConnection::new("http://localhost:2379");

let round_trip = etcd_sub(conn.clone(), "/source/")
    .map(|burst| {
        burst.into_iter().map(|event| {
            let upper = event.entry.value_str().unwrap_or("").to_uppercase().into_bytes();
            EtcdEntry { key: event.entry.key.replacen("/source/", "/dest/", 1), value: upper }
        })
        .collect::<Burst<EtcdEntry>>()
    })
    .etcd_pub(conn, None, true);

round_trip.run(RunMode::RealTime, RunFor::Cycles(3)).unwrap();

Full example.

ZeroMQ Example

Publish a stream over ZMQ and subscribe from another process — cross-language compatible with the Python bindings:

// publisher
use wingfoil::adapters::zmq::ZeroMqPub;
use wingfoil::*;

ticker(Duration::from_millis(100))
    .count()
    .map(|n: u64| format!("{n}").into_bytes())
    .zmq_pub(7779, ())
    .run(RunMode::RealTime, RunFor::Forever)?;
// subscriber
use wingfoil::adapters::zmq::zmq_sub;
use wingfoil::*;

let (data, _status) = zmq_sub::<Vec<u8>>("tcp://127.0.0.1:7779")?;
// See wingfoil-python/examples/zmq/ for a Python subscriber
data.print()
    .run(RunMode::RealTime, RunFor::Forever)?;

Service discovery via etcd is also supported — see the etcd examples for details.

Full example.

FIX Protocol Example

Connect to a FIX 4.4 exchange (e.g. LMAX London Demo) over TLS, subscribe to market data, and process incoming messages — all as a streaming graph:

use wingfoil::adapters::fix::fix_connect_tls;
use wingfoil::*;

let fix = fix_connect_tls(
    "fix-marketdata.london-demo.lmax.com", 443,
    &username, "LMXBDM", Some(&password),
);

// Subscribe to EUR/USD — waits for LoggedIn, then sends the request.
let sub = fix.fix_sub(constant(vec!["4001".into()]));

let data_node = fix.data.logged("fix-data", Info).as_node();
let status_node = fix.status.logged("fix-status", Info).as_node();

Graph::new(
    vec![data_node, status_node, sub],
    RunMode::RealTime,
    RunFor::Duration(Duration::from_secs(60)),
)
.run()
.unwrap();

Run the self-contained loopback example (no external FIX engine needed):

RUST_LOG=info cargo run --example fix_loopback --features fix

Full examples.

Telemetry Example

Export stream metrics to Grafana via Prometheus scraping (pull) or OpenTelemetry OTLP push — or both simultaneously:

use wingfoil::adapters::prometheus::PrometheusExporter;
use wingfoil::adapters::otlp::{OtlpConfig, OtlpPush};
use wingfoil::*;

let exporter = PrometheusExporter::new("0.0.0.0:9091");
exporter.serve()?;

let config = OtlpConfig {
    endpoint: "http://localhost:4318".into(),
    service_name: "my-app".into(),
};

let counter = ticker(Duration::from_secs(1)).count();
let prometheus_node = exporter.register("wingfoil_ticks_total", counter.clone());
let otlp_node = counter.otlp_push("wingfoil_ticks_total", config);

Graph::new(vec![prometheus_node, otlp_node], RunMode::RealTime, RunFor::Forever).run()?;

Full example.

Links

Get Involved!

We want to hear from you! Especially if you:

  • are interested in contributing
  • know of a project that wingfoil would be well-suited for
  • would like to request a feature or report a bug
  • have any feedback

Please do get in touch: