influxdb-stream
Async streaming client for InfluxDB 2.x — Query millions of time-series rows without running out of memory.
💡 Other Rust clients are async but load all results into
Vec<T>. This crate is the first to offer true record-by-record streaming.
The Problem
Existing Rust InfluxDB clients load entire query results into memory:
// Using existing crates - this will OOM with large datasets!
let results: = client.query.await?;
When you're dealing with time-series data spanning months or years, with millions of data points, this approach simply doesn't work.
The Solution
influxdb-stream streams results one record at a time:
// Process millions of rows with constant memory usage
let mut stream = client.query_stream.await?;
while let Some = stream.next.await
Comparison with Existing Crates
| Crate | InfluxDB Version | Async | Streaming | Memory Efficient |
|---|---|---|---|---|
influxdb |
1.x | ✅ | ❌ | ❌ |
influxdb2 |
2.x | ✅ | ❌ | ❌ |
influx_db_client |
1.x/2.x | ✅ | ❌ | ❌ |
influxdb-stream |
2.x | ✅ | ✅ | ✅ |
All crates are async, but return
Vec<T>. Onlyinfluxdb-streamstreams record-by-record.
Benchmark Results
Measured on MacBook Pro (M-series), InfluxDB 2.7 running locally.
Throughput Comparison: influxdb-stream vs influxdb2
| Records | influxdb-stream | influxdb2 | Improvement |
|---|---|---|---|
| 1,000 | 166K rec/s | 132K rec/s | +26% |
| 5,000 | 387K rec/s | 276K rec/s | +40% |
| 10,000 | 456K rec/s | 334K rec/s | +37% |
Memory Usage Comparison (System RSS)
| Records | influxdb-stream | influxdb2 | Ratio | Memory Saved |
|---|---|---|---|---|
| 10,000 | 0.95 MB | 13.03 MB | 13.7x | 92.7% |
| 50,000 | 0.23 MB | 46.88 MB | 200x | 99.5% |
| 100,000 | 0.34 MB | 71.69 MB | 208x | 99.5% |
| 200,000 | 0.06 MB | 137.84 MB | 2205x | ~100% |
Key advantages:
- 26-40% faster than influxdb2 crate
- O(1) memory: Constant memory regardless of result set size
- 200-2000x less memory for large datasets
- Streaming mode: Process records without storing them (influxdb2 can't do this)
Run benchmarks yourself:
Installation
Add to your Cargo.toml:
[]
= "0.1"
= "0.3"
= { = "1", = ["rt-multi-thread", "macros"] }
Quick Start
use Client;
use StreamExt;
async
API Overview
Client
// Create with default HTTP client
let client = new;
// Create with custom reqwest client (for timeouts, proxies, etc.)
let http = builder
.timeout
.build?;
let client = with_http_client;
Streaming Query
// Returns a Stream of Result<FluxRecord>
let mut stream = client.query_stream.await?;
while let Some = stream.next.await
Collect All (for small datasets)
// Warning: loads all results into memory
let records: = client.query.await?;
FluxRecord
let record: FluxRecord = ...;
// Common accessors
record.time // Option<&DateTime<FixedOffset>>
record.measurement // Option<String>
record.field // Option<String>
record.value // Option<&Value>
// Generic accessors
record.get // Option<&Value>
record.get_string // Option<String>
record.get_double // Option<f64>
record.get_long // Option<i64>
record.get_bool // Option<bool>
Value Types
All InfluxDB data types are supported:
| InfluxDB Type | Rust Type |
|---|---|
string |
Value::String(String) |
double |
Value::Double(OrderedFloat<f64>) |
boolean |
Value::Bool(bool) |
long |
Value::Long(i64) |
unsignedLong |
Value::UnsignedLong(u64) |
duration |
Value::Duration(chrono::Duration) |
base64Binary |
Value::Base64Binary(Vec<u8>) |
dateTime:RFC3339 |
Value::TimeRFC(DateTime<FixedOffset>) |
Use Cases
Data Migration
let mut stream = source_client.query_stream.await?;
while let Some = stream.next.await
ETL Pipeline
let mut stream = client.query_stream.await?;
while let Some = stream.next.await
Real-time Analytics
let mut stream = client.query_stream.await?;
let mut stats = new;
while let Some = stream.next.await
println!;
How It Works
InfluxDB HTTP API (/api/v2/query)
↓ (Accept: application/csv)
reqwest bytes_stream()
↓
tokio_util::StreamReader
↓
Annotated CSV State Machine Parser
↓
Stream<Item = Result<FluxRecord>>
The parser processes InfluxDB's annotated CSV format line by line, never buffering more than a single row at a time.
Roadmap
-
v0.1.0 - Query Streaming
- Flux query execution with streaming results
- Memory-efficient processing
- All InfluxDB data types
- Comprehensive error handling
-
v0.2.0 - Write Streaming
-
Stream<Item = DataPoint>→ InfluxDB streaming write - Line Protocol conversion with chunked transfer
- Backpressure handling
-
-
v0.3.0 - Arrow Flight (InfluxDB 3.x)
- Arrow Flight SQL protocol support
-
RecordBatchstreaming - Feature flags for v2/v3
Requirements
- Rust 1.85+ (Edition 2024)
- InfluxDB 2.x server
Development
Running Tests
Tests require a running InfluxDB instance. Use Docker Compose to start one:
# Start InfluxDB
# Run all tests (unit + integration)
# Or run manually:
Running Benchmarks
Benchmarks measure streaming performance with various dataset sizes:
# Run benchmarks
# Or run manually:
# Results are saved to target/criterion/
# Open target/criterion/report/index.html for graphs
Test Coverage
| Category | Tests | Description |
|---|---|---|
| Unit | 5 | Value parsing for each data type |
| Integration | 8 | End-to-end with real InfluxDB |
| Large Dataset | 2 | 10K and 100K record streaming |
| Benchmark | 3 | Throughput, memory, latency |
License
This project is licensed under the MIT License.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Acknowledgments
This project was extracted from production code at Aniai, where it has been battle-tested processing billions of time-series data points for industrial IoT applications.