wingfoil 4.0.0

graph based stream processing framework
Documentation
[![CI](https://github.com/wingfoil-io/wingfoil/actions/workflows/rust.yml/badge.svg)](https://github.com/wingfoil-io/wingfoil/actions/workflows/rust.yml)
[![codecov](https://codecov.io/gh/wingfoil-io/wingfoil/graph/badge.svg)](https://codecov.io/gh/wingfoil-io/wingfoil)
[![Crates.io Version](https://img.shields.io/crates/v/wingfoil.svg)](https://crates.io/crates/wingfoil)
[![Docs.rs](https://docs.rs/wingfoil/badge.svg)](https://docs.rs/wingfoil/)
[![PyPI - Version](https://img.shields.io/pypi/v/wingfoil.svg)](https://pypi.org/project/wingfoil/)
[![Documentation Status](https://readthedocs.org/projects/wingfoil/badge/?version=latest)](https://wingfoil.readthedocs.io/en/latest/)

# Wingfoil

Wingfoil is a [blazingly fast](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/benches/), highly scalable 
stream processing framework designed for latency-critical use cases such as electronic trading 
and real-time AI systems.

Wingfoil simplifies receiving, processing and distributing streaming data across your entire stack.

## Features

- **Fast**: Ultra-low latency and high throughput with an efficient [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) based execution engine.
- **Simple and obvious to use**: Define your graph of calculations; Wingfoil manages its execution.
- **Multi-language**: currently available as a Rust crate and as a beta release, [python package](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil-python) with plans to add WASM/JavaScript/TypeScript support.
- **Backtesting**: [Replay historical](https://docs.rs/wingfoil/latest/wingfoil/#historical-vs-realtime) data to backtest and optimise strategies.
- **Async/Tokio**: seamless integration, allows you to [leverage async](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/async) at your graph edges.
- **Multi-threading**: [distribute graph execution](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/threading) across cores.
- **I/O Adapters**: production-ready [KDB+](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/kdb/round_trip) integration for tick data, [CSV](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/order_book), [etcd](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/etcd) key-value store, [FIX protocol](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/fix) (FIX 4.4 with TLS), [ZeroMQ](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/messaging) pub/sub messaging (beta), [Prometheus](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/telemetry/prometheus) metrics exporter, [OpenTelemetry OTLP](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/telemetry/otlp) push, etc.

## Quick Start

In this example we build a simple, linear pipeline with all nodes ticking in lock-step.

```rust
use wingfoil::*;
use std::time::Duration;
fn main() {
    let period = Duration::from_secs(1);
    ticker(period)
        .count()
        .map(|i| format!("hello, world {:}", i))
        .print()
        .run(RunMode::RealTime, RunFor::Duration(period*3)
    );
}
```
This output is produced:
```pre
hello, world 1
hello, world 2
hello, world 3
```

## Order Book Example

Wingfoil lets you easily wire up complex business logic, splitting and recombining streams, and altering the frequency of data. I/O adapters make it easy to plug in real data sources and sinks. In this example we load a CSV of AAPL limit orders, maintain an order book using the lobster crate, derive trades and two-way prices, and export back to CSV — all in a few lines:

```rust,ignore
let book = RefCell::new(lobster::OrderBook::default());
let get_time = |msg: &Message| NanoTime::new((msg.seconds * 1e9) as u64);
let (fills, prices) = csv_read("aapl.csv", get_time, true)
    .map(move |chunk| process_orders(chunk, &book))
    .split();
let prices_export = prices
    .filter_value(|price: &Option<TwoWayPrice>| !price.is_none())
    .map(|price| price.unwrap())
    .distinct()
    .csv_write("prices.csv");
let fills_export = fills.csv_write("fills.csv");
Graph::new(vec![prices_export, fills_export], RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
    .print()
    .run()
    .unwrap();
```

This output is produced:

<div align="center">
  <img alt="diagram" src="https://raw.githubusercontent.com/wingfoil-io/wingfoil/refs/heads/main/wingfoil/diagrams/aapl.svg"/>
</div>

[Full example.](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/order_book/)


## KDB+ Example

Define a typed struct, implement `KdbDeserialize` to map rows, and stream time-sliced queries directly into your graph:

```rust,ignore
#[derive(Debug, Clone, Default)]
struct Price {
    sym: Sym,
    mid: f64,
}

kdb_read::<Price, _>(
    conn,
    Duration::from_secs(10),
    |(t0, t1), _date, _iter| {
        format!(
            "select time, sym, mid from prices \
             where time >= (`timestamp$){}j, time < (`timestamp$){}j",
            t0.to_kdb_timestamp(),
            t1.to_kdb_timestamp(),
        )
    },
)
.logged("prices", Info)
.run(
    RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
    RunFor::Duration(Duration::from_secs(100)),
)?;
```

[Full example.](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/kdb/read/)

## etcd Example

Watch a key prefix, transform values, and write results back — all in a declarative graph:

```rust,ignore
use wingfoil::adapters::etcd::*;
use wingfoil::*;

let conn = EtcdConnection::new("http://localhost:2379");

let round_trip = etcd_sub(conn.clone(), "/source/")
    .map(|burst| {
        burst.into_iter().map(|event| {
            let upper = event.entry.value_str().unwrap_or("").to_uppercase().into_bytes();
            EtcdEntry { key: event.entry.key.replacen("/source/", "/dest/", 1), value: upper }
        })
        .collect::<Burst<EtcdEntry>>()
    })
    .etcd_pub(conn, None, true);

round_trip.run(RunMode::RealTime, RunFor::Cycles(3)).unwrap();
```

[Full example.](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/etcd/)

## ZeroMQ Example

Publish a stream over ZMQ and subscribe from another process — cross-language compatible with the Python bindings:

```rust,ignore
// publisher
use wingfoil::adapters::zmq::ZeroMqPub;
use wingfoil::*;

ticker(Duration::from_millis(100))
    .count()
    .map(|n: u64| format!("{n}").into_bytes())
    .zmq_pub(7779, ())
    .run(RunMode::RealTime, RunFor::Forever)?;
```

```rust,ignore
// subscriber
use wingfoil::adapters::zmq::zmq_sub;
use wingfoil::*;

let (data, _status) = zmq_sub::<Vec<u8>>("tcp://127.0.0.1:7779")?;
// See wingfoil-python/examples/zmq/ for a Python subscriber
data.print()
    .run(RunMode::RealTime, RunFor::Forever)?;
```

Service discovery via etcd is also supported — see the [etcd examples](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/zmq/etcd/) for details.

[Full example.](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/zmq/)

## FIX Protocol Example

Connect to a FIX 4.4 exchange (e.g. LMAX London Demo) over TLS, subscribe to market data, and process incoming messages — all as a streaming graph:

```rust,ignore
use wingfoil::adapters::fix::fix_connect_tls;
use wingfoil::*;

let fix = fix_connect_tls(
    "fix-marketdata.london-demo.lmax.com", 443,
    &username, "LMXBDM", Some(&password),
);

// Subscribe to EUR/USD — waits for LoggedIn, then sends the request.
let sub = fix.fix_sub(constant(vec!["4001".into()]));

let data_node = fix.data.logged("fix-data", Info).as_node();
let status_node = fix.status.logged("fix-status", Info).as_node();

Graph::new(
    vec![data_node, status_node, sub],
    RunMode::RealTime,
    RunFor::Duration(Duration::from_secs(60)),
)
.run()
.unwrap();
```

Run the self-contained loopback example (no external FIX engine needed):
```sh
RUST_LOG=info cargo run --example fix_loopback --features fix
```

[Full examples.](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/fix/)

## Telemetry Example

Export stream metrics to Grafana via Prometheus scraping (pull) or OpenTelemetry OTLP push — or both simultaneously:

```rust,ignore
use wingfoil::adapters::prometheus::PrometheusExporter;
use wingfoil::adapters::otlp::{OtlpConfig, OtlpPush};
use wingfoil::*;

let exporter = PrometheusExporter::new("0.0.0.0:9091");
exporter.serve()?;

let config = OtlpConfig {
    endpoint: "http://localhost:4318".into(),
    service_name: "my-app".into(),
};

let counter = ticker(Duration::from_secs(1)).count();
let prometheus_node = exporter.register("wingfoil_ticks_total", counter.clone());
let otlp_node = counter.otlp_push("wingfoil_ticks_total", config);

Graph::new(vec![prometheus_node, otlp_node], RunMode::RealTime, RunFor::Forever).run()?;
```

[Full example.](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples/telemetry/)

## Links
- Checkout the [examples](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/examples)
- Download from [crates.io](https://crates.io/crates/wingfoil/)
- Read the [documentation](https://docs.rs/wingfoil/latest/wingfoil/)
- Review the [benchmarks](https://github.com/wingfoil-io/wingfoil/tree/main/wingfoil/benches/)
- Download the wingfoil Python module from [pypi.org](https://pypi.org/project/wingfoil/)

## Get Involved!

We want to hear from you!  Especially if you:
- are interested in [contributing](https://github.com/wingfoil-io/wingfoil/blob/main/CONTRIBUTING.md)
- know of a project that wingfoil would be well-suited for
- would like to request a feature or report a bug
- have any feedback

Please do get in touch:
- ping us on [discord](https://discord.gg/rfGqf3Ff)
- email us at [hello@wingfoil.io](mailto:hello@wingfoil.io)
- submit an [issue](https://github.com/wingfoil-io/wingfoil/issues)
- get involved in the [discussion](https://github.com/wingfoil-io/wingfoil/discussions/)