<p align="center">
<img src="logo.png" alt="Pulse logo" width="720" />
</p>
# Pulse
[](https://github.com/brbtavares/pulse/actions/workflows/ci.yml)
[](https://codecov.io/gh/brbtavares/pulse)
| pulse-core | [](https://crates.io/crates/pulse-core) | [](https://docs.rs/pulse-core) |
| pulse-state | [](https://crates.io/crates/pulse-state) | [](https://docs.rs/pulse-state) |
| pulse-ops | [](https://crates.io/crates/pulse-ops) | [](https://docs.rs/pulse-ops) |
| pulse-io | [](https://crates.io/crates/pulse-io) | [](https://docs.rs/pulse-io) |
Pulse is a tiny, modular, event-time streaming framework (Flink/Beam-like) written in Rust. It focuses on clarity, testability, and a local-first workflow. It supports watermarks, windowing, pluggable state, Prometheus metrics, and a single-binary CLI that runs pipelines from a TOML file.
Goals:
- Local-first: zero external services required for the common path
- Stream processing with event-time and watermarks
- Configurable windowing (tumbling/sliding/session) also in CLI
- Pluggable state backends (in-memory and optional RocksDB)
- File and Parquet I/O, plus optional Kafka
- First-class observability (tracing/Prometheus)
## Local-first vision
- Single binary you can run on your laptop or inside a container without standing up a cluster or control plane.
- Files-in/files-out by default; opt into Kafka when needed.
- Deterministic replay support (EOF watermark) so you can iterate quickly and write golden tests.
- Ergonomics first: small, readable codebase; simple config; sensible defaults.
## Workspace structure
- `pulse-core`: core types/traits, `Executor`, `Record`, event-time `Watermark`, timers, metrics, and config loader
- `pulse-ops`: operators (`Map`, `Filter`, `KeyBy`, `Aggregate`, `WindowedAggregate`), event-time window helpers
- `pulse-state`: state backends
- `InMemoryState` (default)
- `RocksDbState` (feature `rocksdb`)
- `pulse-io`: sources/sinks
- `FileSource` (JSONL/CSV) with EOF watermark
- `FileSink` (JSONL)
- `ParquetSink` (feature `parquet`) with date partitioning and rotation
- `KafkaSource`/`KafkaSink` (feature `kafka`) with resume from persisted offsets
- `pulse-examples`: runnable examples and sample data
- `pulse-bin`: CLI (`pulse`) and `/metrics` HTTP server
## Core features
### Event-time & watermarks
- `Record { event_time: chrono::DateTime<Utc>, value: serde_json::Value }`
- `Watermark(EventTime)` propagated through the pipeline
- Executor drives `on_watermark` for operators and informs sinks
- Lag metric (`pulse_watermark_lag_ms`) = now - watermark
Semantics overview:
- Event-time is derived from your data (RFC3339 string or epoch ms).
- Sources advance a low watermark to signal “no more records ≤ t expected”.
- Operators emit window results when `watermark >= window.end`.
- FileSource emits a final EOF watermark far in the future to flush all windows in batch-like runs.
### Windowing
- Operators: `WindowedAggregate` supports Tumbling/Sliding/Session
- The CLI supports tumbling, sliding, and session with aggregations: `count`, `sum`, `avg`, `distinct`
- EOF watermark emitted by `FileSource` flushes windows
### State & snapshots
- `KvState` trait: `get/put/delete/iter_prefix/snapshot/restore`
- `pulse-state::InMemoryState` implements full API (snapshots kept in-memory)
- `pulse-state::RocksDbState` (feature `rocksdb`): prefix iteration and checkpoint directory creation via RocksDB Checkpoint
- `WindowOperator` can persist per-window state via a backend and restore after restart (optional hook)
### Guarantees (MVP)
- Single-node at-least-once: each record is processed at least once; exactly-once is not guaranteed.
- FileSource is deterministic (EOF watermark). KafkaSource commits offsets periodically (configurable); on restart, offsets are available in KvState for recovery logic.
- Operator state updates and sink writes are not transactional as a unit (no two-phase commit in MVP).
### Limitations (MVP)
- No cluster/distributed runtime yet (single process, single binary).
- No SQL/DSL planner; define pipelines in Rust or via TOML.
- Checkpoint/resume orchestration is minimal: offsets/snapshots exist, but full CLI-driven recovery is a follow-up.
- Kafka is optional and depends on native `librdkafka`.
### I/O
- `FileSource` (JSONL/CSV): parses `event_time` from RFC3339 or epoch ms; final watermark at EOF
- `FileSink`: writes JSON lines to stdout/file
- `ParquetSink` (feature `parquet`):
- Schema: `event_time: timestamp(ms)`, `payload: utf8` (full JSON)
- Partitioning:
- By date (default): `out_dir/dt=YYYY-MM-DD/part-*.parquet` (configurable format via `partition_format`)
- By field: `out_dir/<field>=<value>/part-*.parquet` (set `partition_field`)
- Rotation: by row-count, time, and optional bytes (`max_bytes`)
- Compression: `snappy` (default), `zstd`, or `none`
- Tested: writes files then read back via Arrow reader, asserting row counts
- `KafkaSource`/`KafkaSink` (feature `kafka`): integration with `rdkafka`, with resuming offsets from persisted state
### Observability
- Tracing spans on operators (receive/emit) using `tracing`
- Prometheus metrics (via `pulse-core::metrics`):
- `pulse_operator_records_total{operator,stage=receive|emit}`
- `pulse_watermark_lag_ms` (gauge)
- `pulse_bytes_written_total{sink}`
- `pulse_state_size{operator}`
- `pulse_operator_process_latency_ms` (histogram)
- `pulse_sink_process_latency_ms` (histogram)
- `pulse_queue_depth` (gauge)
- `pulse_dropped_records_total{reason}` (counter)
- `/metrics` HTTP endpoint served by `pulse-bin` (axum 0.7)
## CLI: pulse
Binary crate: `pulse-bin`. Subcommands:
- `pulse serve --port 9898`
- Serves `/metrics` in Prometheus format.
- `pulse run --config pipeline.toml [--http-port 9898]`
- Loads a TOML config, validates it, builds the pipeline, and runs until EOF (or Ctrl-C if you wire a streaming source).
- If `--http-port` is provided, starts `/metrics` on that port.
- Optional backpressure (soft-bound) via environment: set `PULSE_CHANNEL_BOUND` (e.g., `PULSE_CHANNEL_BOUND=10000`) to drop new records when the in-flight depth reaches the bound. Watermarks are never dropped.
### Config format (`pulse-core::config`)
```toml
[source]
kind = "file"
path = "pulse-examples/examples/sliding_avg.jsonl"
time_field = "event_time"
[time]
allowed_lateness = "10s"
[window]
# supported: tumbling|sliding|session
type = "sliding"
size = "60s"
slide = "15s" # for sliding; for session, use: gap = "30s"
[ops]
# aggregation over a key; supported: count (default), sum, avg, distinct
count_by = "word"
# agg = "count" # default
# agg_field = "value" # obrigatório para sum|avg|distinct
[sink]
kind = "parquet"
out_dir = "outputs"
## Optional Parquet settings
# compression = "snappy" # one of: snappy (default) | zstd | none
# max_bytes = 104857600 # rotate file when ~bytes reached (e.g. 100MB)
# partition_field = "user_id" # partition by a payload field value
# partition_format = "%Y-%m" # date partition format when partitioning by event_time
```
Validation rules:
- `source.kind` must be `file` (or `kafka`)
- `sink.kind` must be `parquet`/`file` (or `kafka`)
- `ops.count_by` must be present
### Example: run from config
```powershell
# Build
cargo build
# Run the pipeline and export metrics
cargo run -p pulse-bin -- run --config examples/pipeline.toml --http-port 9898
# Scrape metrics
curl http://127.0.0.1:9898/metrics
```
Expected output:
- Parquet files created under `outputs/dt=YYYY-MM-DD/part-*.parquet`.
### Example: CSV → Parquet
CLI supports JSONL and CSV via `source.format`.
1) Direct CSV in the CLI:
```toml
[source]
kind = "file"
time_field = "event_time" # epoch ms in CSV
[time]
allowed_lateness = "10s"
[window]
type = "tumbling"
size = "60s"
[ops]
count_by = "word"
[sink]
kind = "parquet"
out_dir = "outputs"
```
2) Use the Rust API directly:
```rust
use pulse_core::Executor;
use pulse_io::{FileSource, ParquetSink, ParquetSinkConfig, PartitionSpec};
use pulse_ops::{KeyBy, WindowedAggregate};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// CSV source: header must include event_time (epoch ms)
let src = FileSource { path: "input.csv".into(), format: pulse_io::FileFormat::Csv, event_time_field: "event_time".into(), text_field: None };
let mut exec = Executor::new();
exec.source(src)
.operator(KeyBy::new("word"))
.operator(WindowedAggregate::tumbling_count("key", 60_000))
.sink(ParquetSink::new(ParquetSinkConfig {
out_dir: "outputs".into(),
partition_by: PartitionSpec::ByDate { field: "event_time".into(), fmt: "%Y-%m-%d".into() },
max_rows: 1_000_000,
max_age: std::time::Duration::from_secs(300),
}));
exec.run().await?;
Ok(())
}
```
### Examples: other aggregations in the CLI
```toml
[source]
kind = "file"
path = "pulse-examples/examples/sliding_avg.jsonl"
time_field = "event_time"
[time]
allowed_lateness = "10s"
[window]
type = "tumbling"
size = "60s"
[ops]
count_by = "word"
[sink]
kind = "parquet"
out_dir = "outputs"
```
### Examples crate
- Data examples under `pulse-examples/examples/`
- Sample dataset: `sliding_avg.jsonl` (generated earlier) works with the file source
- You can also wire other examples in `pulse-examples` using the operators from `pulse-ops`
## Optional integrations
Enable per crate features:
```powershell
# Kafka
cargo build -p pulse-io --features kafka
# Arrow/Parquet
cargo build -p pulse-io --features parquet
```
Windows + Kafka notes: enabling `kafka` builds the native `librdkafka` by default and requires CMake and MSVC Build Tools.
- Install CMake and Visual Studio Build Tools (C++), then rerun the build
- Or link against a preinstalled librdkafka and remove `cmake-build` feature
## Running tests
From the workspace root, you can run tests per crate. Some features are crate-specific:
```powershell
# Operators crate
cargo test -p pulse-ops -- --nocapture
# I/O crate with Parquet
cargo test -p pulse-io --features parquet -- --nocapture
# CLI crate (includes end-to-end goldens)
cargo test -p pulse-bin -- --nocapture
# All workspace tests (no extra features)
cargo test -- --nocapture
```
Notes:
- Only the `pulse-io` crate defines the `parquet` feature. Do not pass `--features parquet` to other crates.
- The `kafka` feature is also only on `pulse-io` and requires native dependencies on Windows.
## Why not Flink / Arroyo / Fluvio / Materialize?
Pulse takes a pragmatic, local-first approach for single-node pipelines. A comparison at a glance:
| Flink | Heavy (cluster/services) | No (dev spins cluster) | Yes (rich) | Yes | Yes | Large-scale distributed streaming |
| Arroyo | Moderate (service + workers) | Partial | Yes | Yes | Yes | Cloud-native streaming w/ SQL |
| Fluvio | Broker + clients | Partial | Limited | No | Yes | Distributed streaming data plane |
| Materialize | Service + storage | Partial | Incremental views | Yes | Yes | Streaming SQL materialized views |
| Pulse | Single binary | Yes | Yes (MVP focused) | No | No | Local-first pipelines & testing |
If you need distributed scale, multi-tenant scheduling, or a SQL-first experience, those systems are a better fit. Pulse aims to be the simplest way to iterate on event-time pipelines locally and ship small, self-contained jobs.