influxdb3-client
An async Rust client for InfluxDB 3 Core and Enterprise.
InfluxDB 3 is the latest generation of the InfluxDB time series engine, built on Apache Arrow and DataFusion. Core is the free, single-node build for recent data and edge workloads; Enterprise adds clustering, high availability, and historical query performance on top of the same engine. Both speak the same HTTP write API and serve queries over Arrow Flight, so this client works against either.
- Write data guide: https://docs.influxdata.com/influxdb3/enterprise/write-data/
- Downloads: https://www.influxdata.com/downloads/
This client is part of the InfluxDB 3 client family and mirrors the feature set of the official Go and Python clients with an idiomatic Rust API.
Installation
Requires Rust 1.86 or later. The optional polars feature requires Rust 1.88 or later.
Or add it to Cargo.toml:
[]
= "0.1"
= { = "1", = ["full"] }
The optional polars feature adds DataFrame writes and query-to-DataFrame
conversion:
= { = "0.1", = ["polars"] }
Configuring a client
A client needs a host, a database, and (usually) an API token. Build the configuration explicitly:
use ;
async
Or read INFLUX_HOST, INFLUX_TOKEN, and INFLUX_DATABASE from the
environment:
let client = from_env.await?;
Or parse a connection string:
let client = from_connection_string.await?;
The Arrow Flight channel used for queries is opened lazily on the first query, so constructing a client never blocks on query connectivity.
Writing data
client.write(data) returns a builder. Chain options, then .await. data can
be a line-protocol string, a Vec<Point>, or a polars DataFrame (see below).
Points
use ;
let points = vec!;
client.write.precision.await?;
Raw line protocol
client
.write
.await?;
Write options
client.write
.precision
.batch_size // points per HTTP request
.max_inflight // concurrent in-flight requests
.default_tag
.no_sync // acknowledge before WAL sync
.await?;
Large inputs are split into batches and sent as multiple pipelined requests; one batch buffer is held in memory at a time.
High-throughput ingest
For sustained, high-volume writes the throughput levers are batch_size (points
per request), max_inflight (concurrent requests per call), and no_sync()
(acknowledge before the WAL is synced, trading durability for speed).
A single write call serialises its batches on one task. To use more CPU cores
and connections, run several write calls concurrently. Client is cheap to
share, and its HTTP connection pool is reused, so wrap it in an Arc, spread
chunks across tasks, and cap concurrency with a semaphore to keep in-flight
buffers bounded:
use Arc;
use Semaphore;
let client = new;
let gate = new; // cap concurrent writes
for chunk in chunks
To spread load across multiple ingest nodes, put a load balancer in front of the
cluster, or construct one Client per node and distribute chunks across them.
Set max_idle_connections to at least the total number of concurrent requests
you expect.
Updates and deletes
Writes are idempotent at the (series, timestamp, field) level: writing a point
with the same measurement, tag set, and timestamp overwrites the previous field
values (last write wins). Data deletion and retention are managed at the database
level and are not exposed by this client.
Querying data
InfluxDB 3 supports both SQL and InfluxQL. Use client.sql(q) or
client.influxql(q); both return a query builder.
let result = client
.sql
.await?;
for row in result
InfluxQL is called the same way:
let result = client
.influxql
.await?;
Parameterised queries
let rows = client
.sql
.param
.await?
.rows?;
if let Some = rows.first
Working with rows
A QueryResult can be iterated row by row, collected with .rows(), or accessed
as raw Arrow RecordBatches with .record_batches(). A Row is indexed by
column name (row["col"]) or position (row[0]), and yields a Value with
typed accessors (as_f64, as_i64, as_str, as_bool, is_null).
Streaming large results
For results too large to hold in memory, stream the Arrow batches:
use TryStreamExt;
let mut stream = client.sql.stream.await?;
while let Some = stream.try_next.await?
Reliability
Transient failures are retried automatically with exponential backoff and full
jitter. Connection errors, timeouts, 429, and 5xx responses are retried;
Retry-After is honoured when present. Deterministic failures (other 4xx, and
partial writes) are never retried. Retrying writes is safe because line-protocol
writes are idempotent.
use RetryConfig;
use Duration;
// Per-request override.
client.write
.retry
.await?;
// Disable retries for a single call.
client.write.no_retry.await?;
Set a default policy for all requests with ClientConfig::builder().retry(...).
Partial writes
When a batch contains invalid lines, the server accepts the valid ones and
reports the rest. This surfaces as Error::PartialWrite, which lists the
rejected lines:
use Error;
if let Err = client.write.await
Polars integration
With the polars feature, write a DataFrame directly and read query results back
as a DataFrame.
use DataFrameWrite;
use *;
let df = df!?;
client
.write
.await?;
let df_back = client
.sql
.await?
.to_polars?;
Reading from a parquet or CSV file
File IO lives in your code, not the client: read the file with polars, then hand
the frame to DataFrameWrite. Enable the reader you need on polars in your own
Cargo.toml:
= { = "0.53", = ["parquet"] } # or "csv"
use File;
use *;
let df = new.finish?;
client
.write
.await?;
CSV works the same via CsvReadOptions, but its columns infer as strings unless
you supply dtypes; cast the numeric and bool columns before writing or they will
land as string fields.
Examples
Runnable examples are in examples/:
quickstart.rs: end-to-end write and query.cloud_dedicated.rs: connecting to InfluxDB Cloud Dedicated.write_dataframe.rs: polars DataFrame write and read-back (requires--features polars).
INFLUX_HOST=http://localhost:8181 INFLUX_TOKEN=token INFLUX_DATABASE=mydb \
Contributing
Contributions are welcome. To build and check locally:
The polars feature is gated behind a flag, so test it separately:
A few conventions to keep in mind:
- Keep
cargo clippy --all-targetsfree of errors. - The
config_testsenv-var tests mutate process environment, so run that file single-threaded if you see collisions:cargo test --test config_tests -- --test-threads=1. - Comments and docs are ASCII only.
Please open an issue to discuss substantial changes before sending a pull request.
License
MIT