Expand description
§Wingfoil
Wingfoil is a blazingly fast, highly scalable stream processing framework designed for latency-critical use cases such as electronic trading and real-time AI systems.
It ships with a growing library of production-ready I/O adapters covering tick stores, message buses, market protocols, and observability backends — so you can plug graphs into real data sources and sinks with a single line.
Wingfoil simplifies receiving, processing, distributing and monitoring streaming data across your entire stack.
§Features
- Fast: Ultra low latency and high throughput with an efficient DAG based execution engine.
- Backtesting: Replay historical data to backtest and optimise strategies.
- Simple and obvious to use: Define your graph of calculations; Wingfoil manages its execution.
- I/O Adapters: production-ready integrations for iceoryx2, KDB+, Kafka, Fluvio, FIX, ZeroMQ, etcd, Prometheus, OpenTelemetry, CSV, and more.
- Multi-language: currently available as a Rust crate, python package and a TypeScript/JavaScript client.
- Graph dynamism: rewire your graph in response to incoming data.
- Async/Tokio: seamless integration, allows you to leverage async at your graph edges.
- Multi-threading: distribute graph execution across cores.
§Quick Start
In this example we build a simple, linear pipeline with all nodes ticking in lock-step.
use wingfoil::*;
use std::time::Duration;
fn main() {
let period = Duration::from_secs(1);
ticker(period)
.count()
.map(|i| format!("hello, world {:}", i))
.print()
.run(RunMode::RealTime, RunFor::Duration(period*3)
);
}This output is produced:
hello, world 1
hello, world 2
hello, world 3§Order Book Example
Wingfoil lets you easily wire up complex business logic, splitting and recombining streams, and modulating the frequency of data. I/O adapters make it easy to plug in real data sources and sinks. In this example we load a CSV of AAPL limit orders, maintain an order book using the lobster crate, derive trades and two-way prices, and export back to CSV — all in a few lines:
let book = RefCell::new(lobster::OrderBook::default());
let get_time = |msg: &Message| NanoTime::new((msg.seconds * 1e9) as u64);
let (fills, prices) = csv_read("aapl.csv", get_time, true)
.map(move |chunk| process_orders(chunk, &book))
.split();
let prices_export = prices
.filter_value(|price: &Option<TwoWayPrice>| !price.is_none())
.map(|price| price.unwrap())
.distinct()
.csv_write("prices.csv");
let fills_export = fills.csv_write("fills.csv");
Graph::new(vec![prices_export, fills_export], RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.print()
.run()
.unwrap();This output is produced:
§More Examples
Short code snippets for each adapter live in the examples README. The examples below are all runnable — see each one’s README.md for setup and commands.
§Core concepts
| Example | Description |
|---|---|
order_book | Load NASDAQ AAPL limit orders from CSV, maintain an order book, derive trades and two-way prices, export to CSV. |
breadth_first | Why wingfoil’s BFS execution avoids the O(2^N) node explosion of naive depth-first DAGs. |
run_mode | Swap RunMode::RealTime and RunMode::HistoricalFrom with the same graph wiring for backtesting. |
async | Integrate Tokio async/await at graph edges (I/O adapters) while keeping the core graph synchronous. |
threading | Distribute graph execution across worker threads with producer() / mapper(). |
dynamic | Add and remove nodes at runtime. Includes demux, dynamic-group, and dynamic-manual variants. |
tracing | Instrumentation modes (log, tracing, instruments) for event and span handling. |
latency | Per-hop latency stamping with Traced<T, L> and LatencyReport, transported over iceoryx2. |
§I/O adapters
| Example | Description |
|---|---|
kdb | KDB+ integration: time-sliced reads, cached reads (LRU file cache), and round-trip write/read/validate. |
kafka | Kafka / Redpanda adapter — subscribe, transform, publish pipeline via rdkafka. |
fluvio | Fluvio distributed streaming — subscribe, transform, publish pipeline. |
fix | FIX 4.4 protocol: self-contained loopback, client, echo server, and live LMAX market data over TLS. |
zmq | ZeroMQ pub/sub with direct addressing or etcd-based service discovery. |
etcd | etcd key-value store adapter for sub/pub with transformation. |
iceoryx2 | Zero-copy IPC over shared memory (spin, threaded, signaled polling modes). |
aeron | Low-latency Aeron UDP/IPC transport — publish and subscribe to i64 values with spin and threaded polling modes. |
web | WebSocket adapter streaming synthetic prices and receiving UI events. |
telemetry | Metrics export via Prometheus scraping (pull) and OpenTelemetry OTLP (push). |
§Links
- Checkout the examples
- Download from crates.io
- Read the documentation
- Review the benchmarks
- Download the wingfoil Python module from pypi.org
- Download the
@wingfoil/clientbrowser client from npmjs.com
§Get Involved!
We want to hear from you! Especially if you:
- are interested in contributing
- know of a project that wingfoil would be well-suited for
- would like to request a feature or report a bug
- have any feedback
Please do get in touch:
- ping us on discord
- email us at hello@wingfoil.io
- submit an issue
- get involved in the discussion
§Graph Execution
Wingfoil abstracts away the details of how to co-ordinate the calculation of your application, parts of which may be executing at different frequencies. Only the nodes that actually require cycling are executed which allows wingfoil to efficiently scale to very large graphs. Consider the following example:
use wingfoil::*;
use std::time::Duration;
fn main() {
let period = Duration::from_millis(10);
let source = ticker(period).count(); // 1, 2, 3 etc
let is_even = source.map(|i| i % 2 == 0);
let odds = source
.filter(is_even.not())
.map(|i| format!("{:} is odd", i));
let evens = source
.filter(is_even)
.map(|i| format!("{:} is even", i));
merge(vec![odds, evens])
.print()
.run(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Duration(period * 5),
);
}This output is produced:
1 is odd
2 is even
3 is odd
4 is even
5 is odd
6 is evenWe can visualise the graph like this:
§Historical vs RealTime
Time is a first-class citizen in wingfoil. Engine time is measured in nanoseconds from the UNIX epoch and represented by a NanoTime.
In this example we compare and contrast RealTime vs Historical RunMode. RealTime is used for production deployment. Historical is used for development, unit-testing, integration-testing and back-testing.
use wingfoil::*;
use std::time::Duration;
use log::Level::Info;
pub fn main() {
env_logger::init();
for run_mode in vec![
RunMode::RealTime,
RunMode::HistoricalFrom(NanoTime::ZERO)
] {
println!("\nUsing RunMode::{:?}", run_mode);
ticker(Duration::from_secs(1))
.count()
.logged("tick", Info)
.run(run_mode, RunFor::Cycles(3));
}
}This output is produced:
Using RunMode::RealTime [17:34:46Z INFO wingfoil] 0.000_001 tick 1 [17:34:47Z INFO wingfoil] 1.000_131 tick 2 [17:34:48Z INFO wingfoil] 2.000_381 tick 3 Using RunMode::HistoricalFrom(NanoTime(0)) [17:34:48Z INFO wingfoil] 0.000_000 tick 1 [17:34:48Z INFO wingfoil] 1.000_000 tick 2 [17:34:48Z INFO wingfoil] 2.000_000 tick 3
In Realtime mode the log statements are written every second. In Historical mode the log statements are written immediately. In both cases engine time advances by 1 second between each tick.
§Order Book Example
In this example, we load a CSV file of limit orders for AAPL stock ticker trading on the NASDAQ exchange. The data is sourced from lobsterdata samples
We use the coincidentally named lobster rust library to maintain an order book over time.
Trades and two-way prices are derived, exported back out to CSV and plotted below.
The frequencies of the inputs and outputs are all different to each other.
In addition to the csv output, we also get a performance summary and pretty-print of the graph. One hours worth of data was processed in 287 milliseconds.
8 nodes wired in 10.326µs
Completed 91998 cycles in 287.125397ms. 3.12µs average.
[00] CsvReaderStream
[01] FilterStream
[02] MapStream
[03] MapStream
[04] DistinctStream
[05] CsvWriterNode
[06] MapStream
[07] CsvVecWriterNodeenv_logger::init();
let book = RefCell::new(lobster::OrderBook::default());
let source_path = "examples/lobster/data/aapl.csv";
let fills_path = "examples/lobster/data/fills.csv";
let prices_path = "examples/lobster/data/prices.csv";
// map from seconds from midnight to NanoTime time
let get_time = |msg: &Message| NanoTime::new((msg.seconds * 1e9) as u64);
let (fills, prices) = csv_read_vec(source_path, get_time, true)
.map(move |chunk| process_orders(chunk, &book))
.split();
let prices_export = prices
.filter_value(|price: &Option<TwoWayPrice>| !price.is_none())
.map(|price| price.unwrap())
.distinct()
.csv_write(prices_path);
let fills_export = fills.csv_write_vec(fills_path);
let run_mode = RunMode::HistoricalFrom(NanoTime::ZERO);
let run_for = RunFor::Forever;
Graph::new(vec![prices_export, fills_export], run_mode, run_for)
.print()
.run()
.unwrap();See the order book example for more details.
§async / tokio integration
In this example, we demonstrate wingfoil’s integration with tokio / async. This makes building IO adapters at the graph edge a breeze, giving the best of sync and async worlds.
Async streams are a useful abstraction for IO but are not as powerful or as easy to use for implementing complex business logic as wingfoil. Async uses an implicit, depth first graph execution strategy. In contrast wingfoil’s explicit, breadth first graph execution algorithm, is easier to reason about, encourages re-use and provides a more structured and productive development environment. Wingfoil’s explicit modelling of time with support for historical and realtime modes, is also huge win in terms of productivity and ability to backtest strategies over async streams.
The key methods here are produce_async which maps an async futures stream into a wingfoil stream and consume_async which takes a wingfoil stream and consumes it with a supplied async closure.
This hybrid sync-async approach enforces clear separation between IO and business logic, which can often be problematic in async oriented systems.
RUST_LOG=INFO cargo run --example asyncuse async_stream::stream;
use std::time::Duration;
use std::pin::Pin;
use futures::StreamExt;
use wingfoil::*;
let period = Duration::from_millis(10);
let run_for = RunFor::Duration(period * 5);
let run_mode = RunMode::RealTime;
let producer = move |_ctx: RunParams| async move {
Ok(stream! {
for i in 0.. {
tokio::time::sleep(period).await; // simulate waiting IO
let time = NanoTime::now();
yield Ok((time, i * 10));
}
})
};
let consumer = async move |_ctx: RunParams, mut source: Pin<Box<dyn FutStream<u32>>>| {
while let Some((time, value)) = source.next().await {
println!("{time:?}, {value:?}");
}
Ok(())
};
produce_async(producer)
.logged("on-graph", log::Level::Info)
.collapse()
.consume_async(Box::new(consumer))
.run(run_mode, run_for);See the async example for more details.
§Graph Execution
Wingfoil uses breadth-first graph execution, which eliminates the O(2^N) explosion that affects depth-first frameworks (reactive libraries, async streams) when nodes branch and recombine.
Each add(&source, &source) branches the upstream node into two inputs and
recombines them. Depth-first frameworks visit every path through the graph —
2^N paths at depth N. Wingfoil’s BFS scheduler visits each node exactly once
per tick, regardless of how many upstream paths lead to it.
use wingfoil::*;
fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let mut source = constant(1_u128);
for _ in 1..128 {
source = add(&source, &source);
}
source
.timed()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)?;
println!("value {:?}", source.peek_value());
Ok(())
}127 levels deep — 2^127 as the correct answer — completes in 1 tick in under 10µs:
1 ticks processed in 7.207µs, 7.207µs average.
value 170141183460469231731687303715884105728This also eliminates reactive glitches (inconsistent intermediate state from nodes seeing a mix of old and new values in the same tick). See Wikipedia for background.
§Benchmarks
The bfs_vs_dfs benchmarks measure wingfoil,
rxrust, and async streams side-by-side at depths 1–10:
Both depth-first approaches double in cost every level. Wingfoil stays flat. See the breadth first example for more details.
§Dynamic Graphs
- Add and remove nodes at runtime without stopping execution
- Three approaches: high-level
dynamic_group_stream, hand-rolledMutableNode, and staticdemux_it
Wingfoil supports modifying the graph at runtime — adding and removing nodes between engine cycles — without stopping execution.
Three examples cover this from different angles:
dynamic-group— high-level API using [dynamic_group_stream] to wire per-instrument subgraphs on demand.dynamic-manual— low-level equivalent: a customMutableNodethat callsstate.add_upstream()andstate.remove_node()directly.demux— statically-wired alternative using [demux_it] with a fixed-capacity slot pool; no dynamic wiring required.
All three build the same price aggregator: instruments are created and deleted at runtime, and a running price book is maintained across the changes.
cargo run --example dynamic-group --features dynamic-graph-beta
cargo run --example dynamic-manual --features dynamic-graph-beta
cargo run --example demuxSee the dynamic examples for more details.
§Multithreading
Wingfoil supports multi-threading to distribute workloads across cores. The approach is to compose sub-graphs, each running in its own dedicated thread.
§Messaging
Wingfoil is agnostic to messaging and we plan to add support for shared memory IPC adapters such as Aeron for ultra-low latency use cases, ZeroMq for low latency server to server communication and Kafka for high latency, fault tolerant messaging.
§Graph Dynamism
Some use cases require graph-dynamism - consider for example Request for Quote (RFQ) markets, where the graph needs to be able to adapt to an incoming RFQ. Conceptually this could be done by changing the shape of the graph at run time. However, we take a simpler approach by virtualising this, following a demux pattern.
§Performance
We take performance seriously, and ongoing work is focused on making Wingfoil even closer to a zero-cost abstraction, Currently, the overhead of cycling a node in the graph is less than 10 nanoseconds.
For best performance we recommend using cheaply cloneable types:
- For small strings:
arraystring - For small vectors:
tinyvec - For larger or heap-allocated types:
§Observability
Wingfoil supports both the log and tracing
ecosystems via cargo feature flags.
§Feature flags
| Feature | Effect |
|---|---|
| (none) | logged() and GraphState::log() emit via the log crate — wire up any log-compatible backend (e.g. env_logger). |
tracing | Events are emitted via tracing instead. A tracing subscriber is required; the log bridge ensures events still reach env_logger if none is installed. |
instrument-run | Adds a tracing span around Graph::run() (the full setup→run→teardown lifecycle). |
instrument-cycle | Adds a tracing span around each engine cycle (one span per dirty-node batch). |
instrument-apply-nodes | Adds a tracing span around each lifecycle phase (setup / start / stop / teardown), recording the phase name. |
instrument-initialise | Adds a tracing span around graph initialisation. |
instrument-cycle-node | Adds a tracing span per node execution, recording the node index and type name. High frequency — opt in deliberately. |
instrument-default | Enables instrument-run, instrument-cycle, instrument-apply-nodes, and instrument-initialise. |
instrument-all | Enables instrument-default plus instrument-cycle-node. |
All instrument-* features imply tracing.
§Example
use log::Level::Info;
use std::time::Duration;
use wingfoil::*;
// With the `tracing` feature and a subscriber installed:
// tracing_subscriber::fmt::init();
ticker(Duration::from_secs(1))
.count()
.logged("tick", Info) // emits a tracing event per tick when feature = "tracing"
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(3))
.unwrap();See the tracing example for a runnable demonstration.
Modules§
- adapters
- A library of input and output adapters
Macros§
- burst
- Macro to create a
Burst<T>with type inference. - latency_
stages - Declarative macro that generates a
#[repr(C)]named-field latency record plus per-stage marker types. See the module docs for usage. Declare a fixed-size, named-field latency record suitable for embedding in a#[repr(C)]payload. Each field is au64nanosecond timestamp.
Structs§
- Call
Back Stream - A queue of values that are emitted at specified times. Useful for unit tests. Can also be used to feed stream output back into the Graph as input on later cycles.
- Channel
Receiver Stream - Demux
Map - Maintains map from Key k, to output node for demux in order to demux a source. Used by StreamOperators::demux_it.
- Feedback
Sink - Write end of a feedback channel. Clone-able and can be moved into closures. Calling send pushes a value onto the shared queue and schedules the paired source stream to cycle.
- Graph
- Engine for co-ordinating execution of Nodes
- Graph
State - Maintains the parts of the graph state that is accessible to Nodes.
- Iterator
Stream - Wraps an Iterator and exposes it as a
StreamofBurst<T>. Multiple items with the same timestamp are grouped into a singleBurstper tick. - Latency
Report - A sink node that consumes a stream of
P: HasLatencyand accumulates per-stage delta statistics. - Latency
Stats - Aggregated per-stage statistics for a
Latencytype. - MapFilter
Stream - Map’s it’s source into a new Stream using the supplied closure. Used by map.
- Nano
Time - A time in nanoseconds since the unix epoch.
- Overflow
- Represents a Stream of values that failed to be demuxed because the the demux capacity was exceeded. Output of StreamOperators::demux and StreamOperators::demux_it.
- RunParams
- Context passed to async producer closures during graph setup.
- Simple
Iterator Stream - Wraps an Iterator and exposes it as a Stream of values.
The source must be strictly ascending in time. If the source
can tick multiple times at the same timestamp, use
IteratorStream instead, which emits
Burst<T>. - Stage
Stats - Fixed-size, non-allocating per-stage statistics: count, total, min, max, plus a log2-bucketed histogram for percentile estimation.
- Stamp
Precise Stream - Like
StampStreambut readsGraphState::wall_time_precise— a fresh TSC snap on every tick. Costs ~5-10 ns per stamp on x86 but gives intra-cycle resolution, so stages running in the same engine cycle get distinct timestamps. - Stamp
Stream - A node that forwards its upstream value while stamping
GraphState::wall_time(cycle-start wall-clock snap) into a single named stage of the embeddedLatencyrecord. - Traced
- A payload
Tpaired with a latency recordL. - TryIterator
Stream - Like
IteratorStreambut for a fallible iterator: each item is ananyhow::Result<ValueAt<T>>. A row that resolves toErris surfaced to the graph (failing the run) the moment the stream reaches it, instead of panicking. Successful items are grouped into aBurst<T>per tick, as inIteratorStream. - UpStreams
- The graph can ask a Node what it’s upstreams sources are. The node replies with a UpStreams for passive and active sources. All sources are wired upstream. Active nodes trigger Node.cycle() when they tick. Passive Nodes do not.
Enums§
- Demux
Event - A message used to signal that a demuxed child stream can be closed. Used by StreamOperators::demux and StreamOperators::demux_it
- Dep
- Wraps a Stream to indicate whether it is an active or passive dependency. Active dependencies trigger downstream nodes when they tick. Passive dependencies are read but don’t trigger execution.
- RunFor
- Defines how long the graph should run for. Can be a Duration, number of cycles or forever.
- RunMode
- Whether the Graph should run RealTime or Historical mode.
Traits§
- AsNode
- Used to cast Rc<dyn Stream> to Rc<dyn Node>
- AsStream
- Used co cast Rc of concrete stream into Rc of dyn Stream.
- AsUpstream
Nodes - Helper trait so the
#[node]macro can call a single method regardless of whether the field isRc<dyn Node>,Rc<dyn Stream<T>>, or aVecof either. - FutStream
- A convenience alias for
futures::Streamwith items of type(NanoTime, T). used by StreamOperators::consume_async. - HasLatency
- A payload that carries an embedded
Latencyrecord. - Into
Node - Used to consume a concrete MutableNode and return an Rc<dyn Node>>.
- Into
Stream - Used to consume a concrete Stream and return an Rc<dyn Stream>>.
- Latency
- A fixed-size, named-field collection of
u64nanosecond timestamps. - Latency
Report Ops - Extension methods to install a
LatencyReportsink. Returns the report’s stats handle so the caller can inspect numbers after the graph exits. - Latency
Stream Ops - Extension trait adding
.stamp::<Stage>()and friends to streams whose values carry aLatencyrecord. - Mutable
Node - Implement this trait create your own Node.
- Node
- A wiring point in the Graph.
- Node
Flow Operators - Flow-control operators for Nodes. These mirror the same-named methods on StreamOperators but operate on tick signals rather than values.
- Node
Operators - A trait containing operators that can be applied to Nodes. Used to support method chaining syntax.
- Stage
- A compile-time marker identifying one stage within a
Latencyrecord. - Stream
- A Node which has some state that can peeked at.
- Stream
Operators - A trait containing operators that can be applied to Streams. Used to support method chaining syntax.
- Stream
Peek - The trait through which a Streams can current value can be peeked at.
- Stream
Peek Ref - A trait through which a reference to Stream’s value can be peeked at.
Functions§
- add
- Returns a Stream that adds both it’s source Streams. Ticks when either of it’s sources ticks.
- always
- Produces a Node that ticks on every engine cycle.
- bimap
- Maps two Streams into one using the supplied function. Use Dep::Active and Dep::Passive to control which upstreams trigger execution.
- combine
- Collects a Vec of Streams into a Stream of Vec.
- constant
- Returns a stream that ticks once with the specified value, on the first cycle.
- feedback
- Creates a feedback channel. Returns a (FeedbackSink, Stream) pair. The sink pushes values onto a shared [TimeQueue]; the source stream pops them on the next engine cycle.
- feedback_
node - Creates a feedback channel carrying
(). Returns a (FeedbackSink, Node) pair suitable for signalling ticks without carrying a value. - merge
- Returns a stream that merges it’s sources into one. Ticks when either of it’s sources ticks. If more than one source ticks at the same time, the first one that was supplied is used.
- never
- Produces a Node that never ticks.
- produce_
async - Create a Stream from a fallible async function that produces a futures::Stream.
- producer
- Creates a Stream emitting values on this thread but produced on a worker thread.
- ticker
- Returns a Node that ticks with the specified period.
- trimap
- Maps three Streams into one using the supplied function. Use Dep::Active and Dep::Passive to control which upstreams trigger execution.
- try_
bimap - Maps two Streams into one using a fallible closure. Use Dep::Active and Dep::Passive to control which upstreams trigger execution. Errors propagate to graph execution.
- try_
trimap - Maps three Streams into one using a fallible closure. Use Dep::Active and Dep::Passive to control which upstreams trigger execution. Errors propagate to graph execution.
Type Aliases§
- Burst
- A small vector optimised for single-element bursts.
Attribute Macros§
- node
- Attribute macro for
impl MutableNodeblocks.