Pulse
| Crate | crates.io | docs.rs | 
|---|---|---|
| pulse-core | ||
| pulse-state | ||
| pulse-ops | ||
| pulse-io | 
Pulse is a tiny, modular, event-time streaming framework (Flink/Beam-like) written in Rust. It focuses on clarity, testability, and a local-first workflow. It supports watermarks, windowing, pluggable state, Prometheus metrics, and a single-binary CLI that runs pipelines from a TOML file.
Goals:
- Local-first: zero external services required for the common path
- Stream processing with event-time and watermarks
- Configurable windowing (tumbling/sliding/session) also in CLI
- Pluggable state backends (in-memory and optional RocksDB)
- File and Parquet I/O, plus optional Kafka
- First-class observability (tracing/Prometheus)
Local-first vision
- Single binary you can run on your laptop or inside a container without standing up a cluster or control plane.
- Files-in/files-out by default; opt into Kafka when needed.
- Deterministic replay support (EOF watermark) so you can iterate quickly and write golden tests.
- Ergonomics first: small, readable codebase; simple config; sensible defaults.
Workspace structure
- pulse-core: core types/traits,- Executor,- Record, event-time- Watermark, timers, metrics, and config loader
- pulse-ops: operators (- Map,- Filter,- KeyBy,- Aggregate,- WindowedAggregate), event-time window helpers
- pulse-state: state backends- InMemoryState(default)
- RocksDbState(feature- rocksdb)
 
- pulse-io: sources/sinks- FileSource(JSONL/CSV) with EOF watermark
- FileSink(JSONL)
- ParquetSink(feature- parquet) with date partitioning and rotation
- KafkaSource/- KafkaSink(feature- kafka) with resume from persisted offsets
 
- pulse-examples: runnable examples and sample data
- pulse-bin: CLI (- pulse) and- /metricsHTTP server
Core features
Event-time & watermarks
- Record { event_time: chrono::DateTime<Utc>, value: serde_json::Value }
- Watermark(EventTime)propagated through the pipeline
- Executor drives on_watermarkfor operators and informs sinks
- Lag metric (pulse_watermark_lag_ms) = now - watermark
Semantics overview:
- Event-time is derived from your data (RFC3339 string or epoch ms).
- Sources advance a low watermark to signal “no more records ≤ t expected”.
- Operators emit window results when watermark >= window.end.
- FileSource emits a final EOF watermark far in the future to flush all windows in batch-like runs.
Windowing
- Operators: WindowedAggregatesupports Tumbling/Sliding/Session
- The CLI supports tumbling, sliding, and session with aggregations: count,sum,avg,distinct
- EOF watermark emitted by FileSourceflushes windows
State & snapshots
- KvStatetrait:- get/put/delete/iter_prefix/snapshot/restore
- pulse-state::InMemoryStateimplements full API (snapshots kept in-memory)
- pulse-state::RocksDbState(feature- rocksdb): prefix iteration and checkpoint directory creation via RocksDB Checkpoint
- WindowOperatorcan persist per-window state via a backend and restore after restart (optional hook)
Guarantees (MVP)
- Single-node at-least-once: each record is processed at least once; exactly-once is not guaranteed.
- FileSource is deterministic (EOF watermark). KafkaSource commits offsets periodically (configurable); on restart, offsets are available in KvState for recovery logic.
- Operator state updates and sink writes are not transactional as a unit (no two-phase commit in MVP).
Limitations (MVP)
- No cluster/distributed runtime yet (single process, single binary).
- No SQL/DSL planner; define pipelines in Rust or via TOML.
- Checkpoint/resume orchestration is minimal: offsets/snapshots exist, but full CLI-driven recovery is a follow-up.
- Kafka is optional and depends on native librdkafka.
I/O
- FileSource(JSONL/CSV): parses- event_timefrom RFC3339 or epoch ms; final watermark at EOF
- FileSink: writes JSON lines to stdout/file
- ParquetSink(feature- parquet):- Schema: event_time: timestamp(ms),payload: utf8(full JSON)
- Partitioning:
- By date (default): out_dir/dt=YYYY-MM-DD/part-*.parquet(configurable format viapartition_format)
- By field: out_dir/<field>=<value>/part-*.parquet(setpartition_field)
 
- By date (default): 
- Rotation: by row-count, time, and optional bytes (max_bytes)
- Compression: snappy(default),zstd, ornone
- Tested: writes files then read back via Arrow reader, asserting row counts
 
- Schema: 
- KafkaSource/- KafkaSink(feature- kafka): integration with- rdkafka, with resuming offsets from persisted state
Observability
- Tracing spans on operators (receive/emit) using tracing
- Prometheus metrics (via pulse-core::metrics):- pulse_operator_records_total{operator,stage=receive|emit}
- pulse_watermark_lag_ms(gauge)
- pulse_bytes_written_total{sink}
- pulse_state_size{operator}
- pulse_operator_process_latency_ms(histogram)
- pulse_sink_process_latency_ms(histogram)
- pulse_queue_depth(gauge)
- pulse_dropped_records_total{reason}(counter)
 
- /metricsHTTP endpoint served by- pulse-bin(axum 0.7)
CLI: pulse
Binary crate: pulse-bin. Subcommands:
- 
pulse serve --port 9898- Serves /metricsin Prometheus format.
 
- Serves 
- 
pulse run --config pipeline.toml [--http-port 9898]- Loads a TOML config, validates it, builds the pipeline, and runs until EOF (or Ctrl-C if you wire a streaming source).
- If --http-portis provided, starts/metricson that port.
- Optional backpressure (soft-bound) via environment: set PULSE_CHANNEL_BOUND(e.g.,PULSE_CHANNEL_BOUND=10000) to drop new records when the in-flight depth reaches the bound. Watermarks are never dropped.
 
Config format (pulse-core::config)
[]
 = "file"
 = "pulse-examples/examples/sliding_avg.jsonl"
 = "event_time"
[]
 = "10s"
[]
# supported: tumbling|sliding|session
 = "sliding"
 = "60s"
 = "15s"   # for sliding; for session, use: gap = "30s"
[]
# aggregation over a key; supported: count (default), sum, avg, distinct
 = "word"
# agg = "count"          # default
# agg_field = "value"     # obrigatório para sum|avg|distinct
[]
 = "parquet"
 = "outputs"
## Optional Parquet settings
# compression = "snappy"      # one of: snappy (default) | zstd | none
# max_bytes = 104857600        # rotate file when ~bytes reached (e.g. 100MB)
# partition_field = "user_id" # partition by a payload field value
# partition_format = "%Y-%m"  # date partition format when partitioning by event_time
Validation rules:
- source.kindmust be- file(or- kafka)
- sink.kindmust be- parquet/- file(or- kafka)
- ops.count_bymust be present
Example: run from config
# Build
cargo build
# Run the pipeline and export metrics
cargo run -p pulse-bin -- run --config examples/pipeline.toml --http-port 9898
# Scrape metrics
curl http://127.0.0.1:9898/metrics
Expected output:
- Parquet files created under outputs/dt=YYYY-MM-DD/part-*.parquet.
Example: CSV → Parquet
CLI supports JSONL and CSV via source.format.
- Direct CSV in the CLI:
[]
 = "file"
 = "csv"               # jsonl | csv
 = "input.csv"
 = "event_time"    # epoch ms in CSV
[]
 = "10s"
[]
 = "tumbling"
 = "60s"
[]
 = "word"
[]
 = "parquet"
 = "outputs"
- Use the Rust API directly:
use Executor;
use ;
use ;
async 
Examples: other aggregations in the CLI
[]
 = "file"
 = "pulse-examples/examples/sliding_avg.jsonl"
 = "event_time"
[]
 = "10s"
[]
 = "tumbling"
 = "60s"
[]
 = "word"
 = "avg"         # or: sum | distinct | count
 = "score" # required for avg/sum/distinct
[]
 = "parquet"
 = "outputs"
Examples crate
- Data examples under pulse-examples/examples/
- Sample dataset: sliding_avg.jsonl(generated earlier) works with the file source
- You can also wire other examples in pulse-examplesusing the operators frompulse-ops
Optional integrations
Enable per crate features:
# Kafka
cargo build -p pulse-io --features kafka
# Arrow/Parquet
cargo build -p pulse-io --features parquet
Windows + Kafka notes: enabling kafka builds the native librdkafka by default and requires CMake and MSVC Build Tools.
- Install CMake and Visual Studio Build Tools (C++), then rerun the build
- Or link against a preinstalled librdkafka and remove cmake-buildfeature
Running tests
From the workspace root, you can run tests per crate. Some features are crate-specific:
# Operators crate
cargo test -p pulse-ops -- --nocapture
# I/O crate with Parquet
cargo test -p pulse-io --features parquet -- --nocapture
# CLI crate (includes end-to-end goldens)
cargo test -p pulse-bin -- --nocapture
# All workspace tests (no extra features)
cargo test -- --nocapture
Notes:
- Only the pulse-iocrate defines theparquetfeature. Do not pass--features parquetto other crates.
- The kafkafeature is also only onpulse-ioand requires native dependencies on Windows.
Why not Flink / Arroyo / Fluvio / Materialize?
Pulse takes a pragmatic, local-first approach for single-node pipelines. A comparison at a glance:
| System | Install/runtime footprint | Local-first UX | Event-time/windowing | SQL | Cluster | Primary niche | 
|---|---|---|---|---|---|---|
| Flink | Heavy (cluster/services) | No (dev spins cluster) | Yes (rich) | Yes | Yes | Large-scale distributed streaming | 
| Arroyo | Moderate (service + workers) | Partial | Yes | Yes | Yes | Cloud-native streaming w/ SQL | 
| Fluvio | Broker + clients | Partial | Limited | No | Yes | Distributed streaming data plane | 
| Materialize | Service + storage | Partial | Incremental views | Yes | Yes | Streaming SQL materialized views | 
| Pulse | Single binary | Yes | Yes (MVP focused) | No | No | Local-first pipelines & testing | 
If you need distributed scale, multi-tenant scheduling, or a SQL-first experience, those systems are a better fit. Pulse aims to be the simplest way to iterate on event-time pipelines locally and ship small, self-contained jobs.