Expand description
§ObzenFlow
ObzenFlow is a high-performance event streaming and processing framework for Rust, built around durable per-stage journals, typed events, and an ergonomics-first DSL for composing pipelines.
Status: pre-1.0. APIs are still evolving and may change between releases.
§Principles
ObzenFlow is built around journal-first execution, wide-event observability, and evidence-based correctness. Every stage reads from upstream append-only journals and writes its outputs to its own journal, making the system’s journaled history both the execution substrate and the primary observability surface.
For the full design philosophy, see obzenflow.dev/philosophy.
Every ObzenFlow application follows the same shape:
FlowApplication::run(flow! {
name: "my_pipeline",
journals: disk_journals("target/logs".into()),
middleware: [rate_limit(100.0)],
stages: {
src = source!("input" => my_source);
xform = transform!("enrich" => my_transform);
out = sink!("output" => my_sink);
},
topology: {
src |> xform |> out;
}
})
.await?;For runnable versions with real domain types and handlers, see the examples catalog in examples/README.md.
§Quickstart: run a real end-to-end demo (HTTP ingestion)
Prerequisites:
- Rust
1.93.0(pinned inrust-toolchain.toml)
Run (starts the web server and /metrics):
cargo run -p obzenflow --example http_ingestion_piggy_bank_demo --features obzenflow_infra/warp-server -- --server --server-port 9090In another terminal, post a couple of events:
curl -XPOST http://127.0.0.1:9090/api/bank/accounts/events \
-H 'content-type: application/json' \
-d '{"event_type":"bank.account","data":{"account_id":"acct-1","owner":"Alice","initial_balance_cents":1000}}'
curl -XPOST http://127.0.0.1:9090/api/bank/tx/events \
-H 'content-type: application/json' \
-d '{"event_type":"bank.tx","data":{"account_id":"acct-1","delta_cents":-99,"note":"coffee"}}'Observe:
- Metrics:
curl http://127.0.0.1:9090/metrics - Topology:
curl http://127.0.0.1:9090/api/topology
Code: examples/http_ingestion_piggy_bank_demo.rs
§More examples
The full catalog with grouped commands and code pointers is in examples/README.md. A few highlights:
# Framework overview: reference catalogs + joins + stateful summary
cargo run -p obzenflow --example product_catalog_enrichment
# Resilience: circuit breaker + typed fallback + contracts
cargo run -p obzenflow --example payment_gateway_resilience
# Middleware inheritance/override (observe /metrics while it runs)
cargo run -p obzenflow --example flow_middleware_config --features obzenflow_infra/warp-server -- --serverNo features are enabled by default. --features obzenflow_infra/warp-server enables the HTTP server and web endpoints, and --features http-pull enables HTTP pull sources. See crates/obzenflow_infra/README.md for the full feature matrix.
An optional Prometheus + Grafana monitoring stack is available in monitoring/ (see monitoring/README.md).
§Project organization
ObzenFlow follows an onion architecture: obzenflow_core defines the business domain and “ports” (traits), and outer layers provide implementations, orchestration, wiring, and concrete integrations.
Inner layers are intentionally generic (domain types + traits) and avoid I/O and runtime/framework integration. Outer layers provide concrete implementations (journals, web/HTTP, middleware/exporters) and wire them into runtime services via traits and composition.
crates/obzenflow_core/README.md: core domain types + stable interfaces (events, journals, contracts, middleware ports)crates/obzenflow_runtime/README.md: stage execution + supervisors + runtime orchestration (the engine)crates/obzenflow_dsl/README.md: theflow!DSL and how it builds a runnable flow graph (including middleware resolution)crates/obzenflow_infra/README.md:FlowApplication+ journaling/web/HTTP implementations, mostly behind feature flagscrates/obzenflow_adapters/README.md: middleware + concrete sources/sinks (connectors) intended to be composed into flows
The root obzenflow crate is a convenience re-export layer for common sources/sinks (src/sources.rs, src/sinks.rs).
§Project policies
- Contributing:
CONTRIBUTING.md - Code of Conduct:
CODE_OF_CONDUCT.md - Governance:
GOVERNANCE.md - Security:
SECURITY.md - Trademarks:
TRADEMARKS.md
§License
Dual-licensed under MIT OR Apache-2.0. See LICENSE-MIT and LICENSE-APACHE.
§Anatomy of a FlowApplication
Every ObzenFlow application follows the same shape: define domain types,
implement handlers, wire them together with the flow! macro, and launch
with FlowApplication::run().
§1. Domain types
Define your events as Rust structs and implement obzenflow_core::TypedPayload
so the framework knows the event type string and schema version at compile time.
use serde::{Deserialize, Serialize};
use obzenflow_core::TypedPayload;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TemperatureReading {
sensor_id: String,
celsius: f64,
}
impl TypedPayload for TemperatureReading {
const EVENT_TYPE: &'static str = "sensor.temperature";
const SCHEMA_VERSION: u32 = 1;
}§2. Handlers
Handlers contain the processing logic for each stage. The framework provides several handler traits, each matching a different stage role.
Sources produce events. obzenflow_runtime::stages::source::FiniteSourceTyped
is the easiest way to emit a Vec<T> of typed payloads:
use obzenflow_runtime::stages::source::FiniteSourceTyped;
let readings = vec![
TemperatureReading { sensor_id: "A1".into(), celsius: 22.5 },
TemperatureReading { sensor_id: "B2".into(), celsius: 35.1 },
];
let source = FiniteSourceTyped::new(readings);Transforms process events one at a time. Implement
obzenflow_runtime::stages::TransformHandler for full control, or use
the typed helpers like MapTyped for simple one-to-one mappings.
Sinks consume events at the end of a pipeline. Implement
obzenflow_runtime::stages::SinkHandler, or use a closure with the
sink! macro for quick prototyping.
§3. The flow! block
The obzenflow_dsl::flow! macro takes five sections:
§name:
A string identifier for the flow. Used for journal directory naming and metrics labelling.
§journals:
An expression that returns a per-flow journal factory.
disk_journals(path)produces durable, file-backed journals (production).memory_journals()produces in-memory journals (tests and benchmarks).
§middleware:
Flow-level middleware factories applied to every stage by default. Individual stages can override by supplying the same middleware type at the stage level.
rate_limit(events_per_sec)applies token-bucket rate limiting.
§stages:
Let-bindings that produce stage descriptors via macros:
source!("name" => handler)for a finite source.async_source!("name" => handler)for an async finite source.infinite_source!("name" => handler)for an infinite source.async_infinite_source!("name" => handler)for an async infinite source.transform!("name" => handler)for a synchronous transform.async_transform!("name" => handler)for an async transform.sink!("name" => handler)for a sink (struct or closure).stateful!("name" => handler)for stateful aggregation.join!("name" => with_ref!(ref_stage, handler))for joining with reference data.
All stage macros accept an optional middleware array:
source!("name" => handler, [rate_limit(10.0)]).
§topology:
Edges connecting stages:
a |> b;declares a forward edge (a feeds into b).a <| b;declares a backward/feedback edge.(reference, stream) |> joiner;wires both inputs into a join stage.
§4. FlowApplication::run()
obzenflow_infra::application::FlowApplication handles runtime setup,
optional HTTP server, CLI argument parsing, Prometheus metrics, and graceful
shutdown. Pass the flow! { ... } block directly to run():
FlowApplication::run(flow! { ... }).await?;Or use the builder for finer control:
FlowApplication::builder()
.with_log_level(LogLevel::Info)
.run_async(flow! { ... })
.await?;§End-to-end example
use anyhow::Result;
use obzenflow_core::TypedPayload;
use obzenflow_dsl::{flow, sink, source, transform};
use obzenflow_infra::application::FlowApplication;
use obzenflow_infra::journal::disk_journals;
use obzenflow_runtime::stages::transform::MapTyped;
use obzenflow_runtime::stages::source::FiniteSourceTyped;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Measurement {
sensor: String,
celsius: f64,
}
impl TypedPayload for Measurement {
const EVENT_TYPE: &'static str = "sensor.measurement";
const SCHEMA_VERSION: u32 = 1;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Alert {
sensor: String,
message: String,
}
impl TypedPayload for Alert {
const EVENT_TYPE: &'static str = "sensor.alert";
const SCHEMA_VERSION: u32 = 1;
}
#[tokio::main]
async fn main() -> Result<()> {
let readings = vec![
Measurement { sensor: "A1".into(), celsius: 22.5 },
Measurement { sensor: "B2".into(), celsius: 85.0 },
Measurement { sensor: "C3".into(), celsius: 42.1 },
];
FlowApplication::run(flow! {
name: "temp_alerts",
journals: disk_journals("target/temp-alerts-logs".into()),
middleware: [],
stages: {
src = source!("readings" => FiniteSourceTyped::new(readings));
check = transform!("threshold_check" => MapTyped::new(|m: Measurement| {
Alert {
sensor: m.sensor.clone(),
message: if m.celsius > 50.0 {
format!("{}: HIGH {:.1}C", m.sensor, m.celsius)
} else {
format!("{}: normal {:.1}C", m.sensor, m.celsius)
},
}
}));
out = sink!("alerts" => |alert: Alert| {
println!("[ALERT] {}", alert.message);
});
},
topology: {
src |> check;
check |> out;
}
})
.await?;
Ok(())
}§Crate organisation
This facade crate re-exports common types from the internal crates so that
simple applications only need obzenflow in their [dependencies]. The
internal crates provide the full implementation:
obzenflow_coredefines the business domain (events, journals, contracts, typed IDs).obzenflow_runtimecontains the execution engine (stage supervisors, pipeline orchestration, metrics).obzenflow_adaptersprovides middleware, concrete sources/sinks, and monitoring exporters.obzenflow_dslimplements theflow!macro and stage descriptor macros.obzenflow_infrahousesFlowApplication, journal backends, and the optional web server.