Expand description
§influxdb-stream
Async streaming client for InfluxDB 2.x that lets you query millions of rows without running out of memory.
§Why?
Existing Rust InfluxDB clients load entire query results into memory:
ⓘ
// This will OOM with millions of rows!
let results: Vec<MyData> = client.query(query).await?;influxdb-stream streams results one record at a time:
ⓘ
// Process millions of rows with constant memory usage
let mut stream = client.query_stream(query).await?;
while let Some(record) = stream.next().await {
process(record?);
}§Quick Start
ⓘ
use influxdb_stream::Client;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("http://localhost:8086", "my-org", "my-token");
let mut stream = client.query_stream(r#"
from(bucket: "sensors")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "temperature")
"#).await?;
while let Some(record) = stream.next().await {
let record = record?;
println!(
"{}: {} = {:?}",
record.measurement().unwrap_or_default(),
record.field().unwrap_or_default(),
record.value()
);
}
Ok(())
}§Features
- Memory efficient: Streams results without loading everything into memory
- Async native: Built on tokio and futures
- All data types: Supports all InfluxDB data types (string, double, bool, long, unsignedLong, duration, base64Binary, dateTime:RFC3339)
- Error handling: All errors are returned as Results, no panics
- Zero copy parsing: Parses InfluxDB’s annotated CSV format on the fly
Re-exports§
pub use client::Client;pub use error::Error;pub use error::Result;pub use types::DataType;pub use types::FluxColumn;pub use types::FluxRecord;pub use types::FluxTableMetadata;pub use value::Value;pub use parser::AnnotatedCsvParser;