Pulse
| Crate | crates.io | docs.rs |
|---|---|---|
| pulse-core | ||
| pulse-state | ||
| pulse-ops | ||
| pulse-io |
Pulse is a tiny, modular, event-time streaming framework (Flink/Beam-like) written in Rust. It focuses on clarity, testability, and a local-first workflow. It supports watermarks, windowing, pluggable state, Prometheus metrics, and a single-binary CLI that runs pipelines from a TOML file.
Goals:
- Local-first: zero external services required for the common path
- Stream processing with event-time and watermarks
- Configurable windowing (tumbling/sliding/session) also in CLI
- Pluggable state backends (in-memory and optional RocksDB)
- File and Parquet I/O, plus optional Kafka
- First-class observability (tracing/Prometheus)
Local-first vision
- Single binary you can run on your laptop or inside a container without standing up a cluster or control plane.
- Files-in/files-out by default; opt into Kafka when needed.
- Deterministic replay support (EOF watermark) so you can iterate quickly and write golden tests.
- Ergonomics first: small, readable codebase; simple config; sensible defaults.
Workspace structure
pulse-core: core types/traits,Executor,Record, event-timeWatermark, timers, metrics, and config loaderpulse-ops: operators (Map,Filter,KeyBy,Aggregate,WindowedAggregate), event-time window helperspulse-state: state backendsInMemoryState(default)RocksDbState(featurerocksdb)
pulse-io: sources/sinksFileSource(JSONL/CSV) with EOF watermarkFileSink(JSONL)ParquetSink(featureparquet) with date partitioning and rotationKafkaSource/KafkaSink(featurekafka) with resume from persisted offsets
pulse-examples: runnable examples and sample datapulse-bin: CLI (pulse) and/metricsHTTP server
Core features
Event-time & watermarks
Record { event_time: chrono::DateTime<Utc>, value: serde_json::Value }Watermark(EventTime)propagated through the pipeline- Executor drives
on_watermarkfor operators and informs sinks - Lag metric (
pulse_watermark_lag_ms) = now - watermark
Semantics overview:
- Event-time is derived from your data (RFC3339 string or epoch ms).
- Sources advance a low watermark to signal “no more records ≤ t expected”.
- Operators emit window results when
watermark >= window.end. - FileSource emits a final EOF watermark far in the future to flush all windows in batch-like runs.
Windowing
- Operators:
WindowedAggregatesupports Tumbling/Sliding/Session - The CLI supports tumbling, sliding, and session with aggregations:
count,sum,avg,distinct - EOF watermark emitted by
FileSourceflushes windows
State & snapshots
KvStatetrait:get/put/delete/iter_prefix/snapshot/restorepulse-state::InMemoryStateimplements full API (snapshots kept in-memory)pulse-state::RocksDbState(featurerocksdb): prefix iteration and checkpoint directory creation via RocksDB CheckpointWindowOperatorcan persist per-window state via a backend and restore after restart (optional hook)
Guarantees (MVP)
- Single-node at-least-once: each record is processed at least once; exactly-once is not guaranteed.
- FileSource is deterministic (EOF watermark). KafkaSource commits offsets periodically (configurable); on restart, offsets are available in KvState for recovery logic.
- Operator state updates and sink writes are not transactional as a unit (no two-phase commit in MVP).
Limitations (MVP)
- No cluster/distributed runtime yet (single process, single binary).
- No SQL/DSL planner; define pipelines in Rust or via TOML.
- Checkpoint/resume orchestration is minimal: offsets/snapshots exist, but full CLI-driven recovery is a follow-up.
- Kafka is optional and depends on native
librdkafka.
I/O
FileSource(JSONL/CSV): parsesevent_timefrom RFC3339 or epoch ms; final watermark at EOFFileSink: writes JSON lines to stdout/fileParquetSink(featureparquet):- Schema:
event_time: timestamp(ms),payload: utf8(full JSON) - Partitioning:
- By date (default):
out_dir/dt=YYYY-MM-DD/part-*.parquet(configurable format viapartition_format) - By field:
out_dir/<field>=<value>/part-*.parquet(setpartition_field)
- By date (default):
- Rotation: by row-count, time, and optional bytes (
max_bytes) - Compression:
snappy(default),zstd, ornone - Tested: writes files then read back via Arrow reader, asserting row counts
- Schema:
KafkaSource/KafkaSink(featurekafka): integration withrdkafka, with resuming offsets from persisted state
Observability
- Tracing spans on operators (receive/emit) using
tracing - Prometheus metrics (via
pulse-core::metrics):pulse_operator_records_total{operator,stage=receive|emit}pulse_watermark_lag_ms(gauge)pulse_bytes_written_total{sink}pulse_state_size{operator}pulse_operator_process_latency_ms(histogram)pulse_sink_process_latency_ms(histogram)pulse_queue_depth(gauge)pulse_dropped_records_total{reason}(counter)
/metricsHTTP endpoint served bypulse-bin(axum 0.7)
CLI: pulse
Binary crate: pulse-bin. Subcommands:
-
pulse serve --port 9898- Serves
/metricsin Prometheus format.
- Serves
-
pulse run --config pipeline.toml [--http-port 9898]- Loads a TOML config, validates it, builds the pipeline, and runs until EOF (or Ctrl-C if you wire a streaming source).
- If
--http-portis provided, starts/metricson that port. - Optional backpressure (soft-bound) via environment: set
PULSE_CHANNEL_BOUND(e.g.,PULSE_CHANNEL_BOUND=10000) to drop new records when the in-flight depth reaches the bound. Watermarks are never dropped.
Config format (pulse-core::config)
[]
= "file"
= "pulse-examples/examples/sliding_avg.jsonl"
= "event_time"
[]
= "10s"
[]
# supported: tumbling|sliding|session
= "sliding"
= "60s"
= "15s" # for sliding; for session, use: gap = "30s"
[]
# aggregation over a key; supported: count (default), sum, avg, distinct
= "word"
# agg = "count" # default
# agg_field = "value" # obrigatório para sum|avg|distinct
[]
= "parquet"
= "outputs"
## Optional Parquet settings
# compression = "snappy" # one of: snappy (default) | zstd | none
# max_bytes = 104857600 # rotate file when ~bytes reached (e.g. 100MB)
# partition_field = "user_id" # partition by a payload field value
# partition_format = "%Y-%m" # date partition format when partitioning by event_time
Validation rules:
source.kindmust befile(orkafka)sink.kindmust beparquet/file(orkafka)ops.count_bymust be present
Example: run from config
# Build
cargo build
# Run the pipeline and export metrics
cargo run -p pulse-bin -- run --config examples/pipeline.toml --http-port 9898
# Scrape metrics
curl http://127.0.0.1:9898/metrics
Expected output:
- Parquet files created under
outputs/dt=YYYY-MM-DD/part-*.parquet.
Example: CSV → Parquet
CLI supports JSONL and CSV via source.format.
- Direct CSV in the CLI:
[]
= "file"
= "csv" # jsonl | csv
= "input.csv"
= "event_time" # epoch ms in CSV
[]
= "10s"
[]
= "tumbling"
= "60s"
[]
= "word"
[]
= "parquet"
= "outputs"
- Use the Rust API directly:
use Executor;
use ;
use ;
async
Examples: other aggregations in the CLI
[]
= "file"
= "pulse-examples/examples/sliding_avg.jsonl"
= "event_time"
[]
= "10s"
[]
= "tumbling"
= "60s"
[]
= "word"
= "avg" # or: sum | distinct | count
= "score" # required for avg/sum/distinct
[]
= "parquet"
= "outputs"
Examples crate
- Data examples under
pulse-examples/examples/ - Sample dataset:
sliding_avg.jsonl(generated earlier) works with the file source - You can also wire other examples in
pulse-examplesusing the operators frompulse-ops
Optional integrations
Enable per crate features:
# Kafka
cargo build -p pulse-io --features kafka
# Arrow/Parquet
cargo build -p pulse-io --features parquet
Windows + Kafka notes: enabling kafka builds the native librdkafka by default and requires CMake and MSVC Build Tools.
- Install CMake and Visual Studio Build Tools (C++), then rerun the build
- Or link against a preinstalled librdkafka and remove
cmake-buildfeature
Running tests
From the workspace root, you can run tests per crate. Some features are crate-specific:
# Operators crate
cargo test -p pulse-ops -- --nocapture
# I/O crate with Parquet
cargo test -p pulse-io --features parquet -- --nocapture
# CLI crate (includes end-to-end goldens)
cargo test -p pulse-bin -- --nocapture
# All workspace tests (no extra features)
cargo test -- --nocapture
Notes:
- Only the
pulse-iocrate defines theparquetfeature. Do not pass--features parquetto other crates. - The
kafkafeature is also only onpulse-ioand requires native dependencies on Windows.
Why not Flink / Arroyo / Fluvio / Materialize?
Pulse takes a pragmatic, local-first approach for single-node pipelines. A comparison at a glance:
| System | Install/runtime footprint | Local-first UX | Event-time/windowing | SQL | Cluster | Primary niche |
|---|---|---|---|---|---|---|
| Flink | Heavy (cluster/services) | No (dev spins cluster) | Yes (rich) | Yes | Yes | Large-scale distributed streaming |
| Arroyo | Moderate (service + workers) | Partial | Yes | Yes | Yes | Cloud-native streaming w/ SQL |
| Fluvio | Broker + clients | Partial | Limited | No | Yes | Distributed streaming data plane |
| Materialize | Service + storage | Partial | Incremental views | Yes | Yes | Streaming SQL materialized views |
| Pulse | Single binary | Yes | Yes (MVP focused) | No | No | Local-first pipelines & testing |
If you need distributed scale, multi-tenant scheduling, or a SQL-first experience, those systems are a better fit. Pulse aims to be the simplest way to iterate on event-time pipelines locally and ship small, self-contained jobs.