influxdb3-client 0.1.0

Rust client for InfluxDB 3 Core and Enterprise
Documentation

influxdb3-client

An async Rust client for InfluxDB 3 Core and Enterprise.

InfluxDB 3 is the latest generation of the InfluxDB time series engine, built on Apache Arrow and DataFusion. Core is the free, single-node build for recent data and edge workloads; Enterprise adds clustering, high availability, and historical query performance on top of the same engine. Both speak the same HTTP write API and serve queries over Arrow Flight, so this client works against either.

This client is part of the InfluxDB 3 client family and mirrors the feature set of the official Go and Python clients with an idiomatic Rust API.

Installation

Requires Rust 1.86 or later. The optional polars feature requires Rust 1.88 or later.

cargo add influxdb3-client

Or add it to Cargo.toml:

[dependencies]
influxdb3-client = "0.1"
tokio = { version = "1", features = ["full"] }

The optional polars feature adds DataFrame writes and query-to-DataFrame conversion:

influxdb3-client = { version = "0.1", features = ["polars"] }

Configuring a client

A client needs a host, a database, and (usually) an API token. Build the configuration explicitly:

use influxdb3_client::{Client, ClientConfig};

#[tokio::main]
async fn main() -> influxdb3_client::Result<()> {
    let client = Client::new(
        ClientConfig::builder()
            .host("http://localhost:8181")
            .token("my-api-token")
            .database("sensors")
            .build()?,
    )
    .await?;
    Ok(())
}

Or read INFLUX_HOST, INFLUX_TOKEN, and INFLUX_DATABASE from the environment:

let client = influxdb3_client::Client::from_env().await?;

Or parse a connection string:

let client = influxdb3_client::Client::from_connection_string(
    "https://cluster.example.io/?token=TOKEN&database=mydb",
).await?;

The Arrow Flight channel used for queries is opened lazily on the first query, so constructing a client never blocks on query connectivity.

Writing data

client.write(data) returns a builder. Chain options, then .await. data can be a line-protocol string, a Vec<Point>, or a polars DataFrame (see below).

Points

use influxdb3_client::{Point, Precision};

let points = vec![
    Point::new("temperature")
        .tag("location", "office")
        .tag("floor", "2")
        .field("celsius", 22.5_f64)
        .field("humidity", 48_i64)
        .field("occupied", true),
];

client.write(points).precision(Precision::Millisecond).await?;

Raw line protocol

client
    .write("cpu,host=server01 usage_user=42.3,usage_system=1.2")
    .await?;

Write options

client.write(points)
    .precision(Precision::Nanosecond)
    .batch_size(10_000)          // points per HTTP request
    .max_inflight(8)             // concurrent in-flight requests
    .default_tag("region", "us-east")
    .no_sync()                   // acknowledge before WAL sync
    .await?;

Large inputs are split into batches and sent as multiple pipelined requests; one batch buffer is held in memory at a time.

High-throughput ingest

For sustained, high-volume writes the throughput levers are batch_size (points per request), max_inflight (concurrent requests per call), and no_sync() (acknowledge before the WAL is synced, trading durability for speed).

A single write call serialises its batches on one task. To use more CPU cores and connections, run several write calls concurrently. Client is cheap to share, and its HTTP connection pool is reused, so wrap it in an Arc, spread chunks across tasks, and cap concurrency with a semaphore to keep in-flight buffers bounded:

use std::sync::Arc;
use tokio::sync::Semaphore;

let client = Arc::new(client);
let gate = Arc::new(Semaphore::new(8)); // cap concurrent writes

for chunk in chunks {                    // each chunk is a Vec<Point>
    let permit = gate.clone().acquire_owned().await.unwrap();
    let client = Arc::clone(&client);
    tokio::spawn(async move {
        let _permit = permit;            // released when the write completes
        client
            .write(chunk)
            .batch_size(10_000)
            .max_inflight(8)
            .no_sync()
            .await
    });
}

To spread load across multiple ingest nodes, put a load balancer in front of the cluster, or construct one Client per node and distribute chunks across them. Set max_idle_connections to at least the total number of concurrent requests you expect.

Updates and deletes

Writes are idempotent at the (series, timestamp, field) level: writing a point with the same measurement, tag set, and timestamp overwrites the previous field values (last write wins). Data deletion and retention are managed at the database level and are not exposed by this client.

Querying data

InfluxDB 3 supports both SQL and InfluxQL. Use client.sql(q) or client.influxql(q); both return a query builder.

let result = client
    .sql("SELECT * FROM temperature ORDER BY time DESC LIMIT 10")
    .await?;

for row in result {
    let row = row?;
    let loc = row["location"].as_str().unwrap_or("");
    let c = row["celsius"].as_f64().unwrap_or(0.0);
    println!("{loc}: {c}");
}

InfluxQL is called the same way:

let result = client
    .influxql("SELECT MEAN(celsius) FROM temperature WHERE time > now() - 1h")
    .await?;

Parameterised queries

let rows = client
    .sql("SELECT COUNT(*) AS n FROM cpu WHERE host = $host")
    .param("host", "server01")
    .await?
    .rows()?;

if let Some(r) = rows.first() {
    println!("count: {}", r["n"]);
}

Working with rows

A QueryResult can be iterated row by row, collected with .rows(), or accessed as raw Arrow RecordBatches with .record_batches(). A Row is indexed by column name (row["col"]) or position (row[0]), and yields a Value with typed accessors (as_f64, as_i64, as_str, as_bool, is_null).

Streaming large results

For results too large to hold in memory, stream the Arrow batches:

use futures_util::TryStreamExt;

let mut stream = client.sql("SELECT * FROM temperature").stream().await?;
while let Some(batch) = stream.try_next().await? {
    println!("got {} rows", batch.num_rows());
}

Reliability

Transient failures are retried automatically with exponential backoff and full jitter. Connection errors, timeouts, 429, and 5xx responses are retried; Retry-After is honoured when present. Deterministic failures (other 4xx, and partial writes) are never retried. Retrying writes is safe because line-protocol writes are idempotent.

use influxdb3_client::RetryConfig;
use std::time::Duration;

// Per-request override.
client.write(points)
    .retry(RetryConfig { max_retries: 5, base_delay: Duration::from_millis(100), ..RetryConfig::default() })
    .await?;

// Disable retries for a single call.
client.write(points).no_retry().await?;

Set a default policy for all requests with ClientConfig::builder().retry(...).

Partial writes

When a batch contains invalid lines, the server accepts the valid ones and reports the rest. This surfaces as Error::PartialWrite, which lists the rejected lines:

use influxdb3_client::Error;

if let Err(Error::PartialWrite(e)) = client.write(line_protocol).await {
    for line_error in &e.line_errors {
        eprintln!("line {}: {}", line_error.line, line_error.message);
    }
}

Polars integration

With the polars feature, write a DataFrame directly and read query results back as a DataFrame.

use influxdb3_client::write_dataframe::DataFrameWrite;
use polars::prelude::*;

let df = df![
    "host"    => ["srv1", "srv2"],
    "region"  => ["us-east", "us-west"],
    "cpu_pct" => [42.5_f64, 71.0_f64],
    "time"    => [1_700_000_000_000_000_000_i64, 1_700_000_001_000_000_000_i64],
]?;

client
    .write(
        DataFrameWrite::new(&df, "server_metrics")
            .tags(&["host", "region"])
            .timestamp_column("time"),
    )
    .await?;

let df_back = client
    .sql("SELECT * FROM server_metrics")
    .await?
    .to_polars()?;

Reading from a parquet or CSV file

File IO lives in your code, not the client: read the file with polars, then hand the frame to DataFrameWrite. Enable the reader you need on polars in your own Cargo.toml:

polars = { version = "0.53", features = ["parquet"] } # or "csv"
use std::fs::File;
use polars::prelude::*;

let df = ParquetReader::new(File::open("sensors.parquet")?).finish()?;

client
    .write(
        DataFrameWrite::new(&df, "sensor_data")
            .tags(&["host", "region"])
            .timestamp_column("time"),
    )
    .await?;

CSV works the same via CsvReadOptions, but its columns infer as strings unless you supply dtypes; cast the numeric and bool columns before writing or they will land as string fields.

Examples

Runnable examples are in examples/:

  • quickstart.rs: end-to-end write and query.
  • cloud_dedicated.rs: connecting to InfluxDB Cloud Dedicated.
  • write_dataframe.rs: polars DataFrame write and read-back (requires --features polars).
INFLUX_HOST=http://localhost:8181 INFLUX_TOKEN=token INFLUX_DATABASE=mydb \
    cargo run --example quickstart

Contributing

Contributions are welcome. To build and check locally:

cargo build
cargo test
cargo clippy --all-targets

The polars feature is gated behind a flag, so test it separately:

cargo test --features polars

A few conventions to keep in mind:

  • Keep cargo clippy --all-targets free of errors.
  • The config_tests env-var tests mutate process environment, so run that file single-threaded if you see collisions: cargo test --test config_tests -- --test-threads=1.
  • Comments and docs are ASCII only.

Please open an issue to discuss substantial changes before sending a pull request.

License

MIT