Skip to main content

influxdb3_client/
config.rs

1use std::time::Duration;
2
3use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
4use url::Url;
5
6use crate::{error::Error, retry::RetryConfig, write::WriteOptions};
7
8/// Configuration for the InfluxDB 3 client.
9///
10/// Construct with [`ClientConfig::builder()`] or parse from a connection string /
11/// environment variables with [`ClientConfig::from_connection_string()`] /
12/// [`ClientConfig::from_env()`].
13#[derive(Debug, Clone)]
14pub struct ClientConfig {
15    /// InfluxDB host URL (e.g. `https://cluster.influxdata.io`).
16    pub host: String,
17
18    /// API token.
19    pub token: Option<String>,
20
21    /// Authentication scheme: `"Bearer"` (default) or `"Token"`.
22    pub auth_scheme: String,
23
24    /// Database for all operations. Required; validated at construction time.
25    pub database: String,
26
27    /// Organization name (used for v2 API compatibility).
28    pub org: Option<String>,
29
30    /// Default write options applied to every write call.
31    pub write_options: WriteOptions,
32
33    /// Default retry policy for transient write/query failures. Override per
34    /// request with `WriteRequest`/`QueryRequest` `.retry()` / `.no_retry()`.
35    pub retry: RetryConfig,
36
37    /// Extra HTTP headers sent with every request.
38    pub headers: HeaderMap,
39
40    /// Path to a PEM file with additional CA roots for TLS verification.
41    pub ssl_roots_path: Option<String>,
42
43    /// HTTP proxy URL.
44    pub proxy: Option<String>,
45
46    /// Request timeout for write calls.
47    pub write_timeout: Duration,
48
49    /// Timeout for the Flight channel connect and for collected (`.await`)
50    /// queries. Streaming queries (`.stream()`) are intentionally unbounded.
51    pub query_timeout: Duration,
52
53    /// Keep-alive idle connection timeout.
54    pub idle_connection_timeout: Duration,
55
56    /// Maximum number of idle connections in the pool.
57    pub max_idle_connections: usize,
58}
59
60impl Default for ClientConfig {
61    fn default() -> Self {
62        ClientConfig {
63            host: String::new(),
64            token: None,
65            auth_scheme: "Bearer".to_string(),
66            database: String::new(), // validated as non-empty in build()
67            org: None,
68            write_options: WriteOptions::default(),
69            retry: RetryConfig::default(),
70            headers: HeaderMap::new(),
71            ssl_roots_path: None,
72            proxy: None,
73            write_timeout: Duration::from_secs(30),
74            query_timeout: Duration::from_secs(60),
75            idle_connection_timeout: Duration::from_secs(90),
76            max_idle_connections: 100,
77        }
78    }
79}
80
81impl ClientConfig {
82    /// Start building a config.
83    pub fn builder() -> ClientConfigBuilder {
84        ClientConfigBuilder::default()
85    }
86
87    /// Parse `INFLUX_HOST`, `INFLUX_TOKEN`, `INFLUX_DATABASE`, and `INFLUX_ORG`
88    /// from the process environment. `INFLUX_HOST` and `INFLUX_DATABASE` are
89    /// required; token and org are optional.
90    pub fn from_env() -> Result<Self, Error> {
91        let host = std::env::var("INFLUX_HOST").map_err(|_| Error::EnvVar("INFLUX_HOST".into()))?;
92        let database = std::env::var("INFLUX_DATABASE")
93            .or_else(|_| std::env::var("INFLUX_BUCKET"))
94            .map_err(|_| Error::EnvVar("INFLUX_DATABASE".into()))?;
95
96        let token = std::env::var("INFLUX_TOKEN").ok();
97        let org = std::env::var("INFLUX_ORG").ok();
98
99        ClientConfig::builder()
100            .host(host)
101            .database(database)
102            .token_opt(token)
103            .org_opt(org)
104            .build()
105    }
106
107    /// Parse a URL-formatted connection string, e.g.:
108    ///
109    /// ```text
110    /// https://cluster.influxdata.io/?token=TOKEN&database=DB&org=ORG
111    /// ```
112    ///
113    /// `database` (or `bucket`) is required; returns an error if absent.
114    pub fn from_connection_string(cs: &str) -> Result<Self, Error> {
115        let url = Url::parse(cs)?;
116        let host = format!("{}://{}", url.scheme(), url.host_str().unwrap_or_default());
117
118        let mut builder = ClientConfig::builder().host(host);
119
120        for (key, value) in url.query_pairs() {
121            match key.as_ref() {
122                "token" => {
123                    builder = builder.token(value.into_owned());
124                }
125                "database" | "bucket" => {
126                    builder = builder.database(value.into_owned());
127                }
128                "org" => {
129                    builder = builder.org(value.into_owned());
130                }
131                _other => {}
132            }
133        }
134
135        builder.build()
136    }
137
138    /// Return the normalised host URL (trailing slash stripped).
139    pub fn host_url(&self) -> &str {
140        self.host.trim_end_matches('/')
141    }
142
143    /// Build the `Authorization` header value (`"Bearer TOKEN"` etc.).
144    ///
145    /// Returns `Ok(None)` when no token is set. Returns an error if the token
146    /// contains characters that are invalid in an HTTP header value.
147    pub fn authorization_header(&self) -> Result<Option<HeaderValue>, Error> {
148        match &self.token {
149            None => Ok(None),
150            Some(tok) => HeaderValue::from_str(&format!("{} {}", self.auth_scheme, tok))
151                .map(Some)
152                .map_err(|_| Error::Config("token contains invalid header characters".into())),
153        }
154    }
155}
156
157/// Fluent builder for [`ClientConfig`].
158#[derive(Debug, Default)]
159pub struct ClientConfigBuilder {
160    cfg: ClientConfig,
161    /// Validated when [`ClientConfigBuilder::build`] is called, so a malformed
162    /// header surfaces as an error rather than a panic at insertion time.
163    pending_headers: Vec<(String, String)>,
164}
165
166impl ClientConfigBuilder {
167    /// Required: the InfluxDB host URL.
168    pub fn host(mut self, host: impl Into<String>) -> Self {
169        self.cfg.host = host.into();
170        self
171    }
172
173    pub fn token(mut self, token: impl Into<String>) -> Self {
174        self.cfg.token = Some(token.into());
175        self
176    }
177
178    pub fn token_opt(mut self, token: Option<String>) -> Self {
179        self.cfg.token = token;
180        self
181    }
182
183    /// `"Bearer"` (default) or `"Token"`.
184    pub fn auth_scheme(mut self, scheme: impl Into<String>) -> Self {
185        self.cfg.auth_scheme = scheme.into();
186        self
187    }
188
189    pub fn database(mut self, db: impl Into<String>) -> Self {
190        self.cfg.database = db.into();
191        self
192    }
193
194    pub fn org(mut self, org: impl Into<String>) -> Self {
195        self.cfg.org = Some(org.into());
196        self
197    }
198
199    pub fn org_opt(mut self, org: Option<String>) -> Self {
200        self.cfg.org = org;
201        self
202    }
203
204    pub fn write_options(mut self, opts: WriteOptions) -> Self {
205        self.cfg.write_options = opts;
206        self
207    }
208
209    /// Set the default retry policy for transient write/query failures.
210    pub fn retry(mut self, retry: RetryConfig) -> Self {
211        self.cfg.retry = retry;
212        self
213    }
214
215    /// Add a single extra HTTP header sent with every request.
216    ///
217    /// The name and value are validated in [`build`](Self::build), so an
218    /// invalid header is reported as an error rather than panicking here.
219    pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
220        self.pending_headers.push((key.into(), value.into()));
221        self
222    }
223
224    pub fn ssl_roots_path(mut self, path: impl Into<String>) -> Self {
225        self.cfg.ssl_roots_path = Some(path.into());
226        self
227    }
228
229    pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
230        self.cfg.proxy = Some(proxy.into());
231        self
232    }
233
234    pub fn write_timeout(mut self, dur: Duration) -> Self {
235        self.cfg.write_timeout = dur;
236        self
237    }
238
239    pub fn query_timeout(mut self, dur: Duration) -> Self {
240        self.cfg.query_timeout = dur;
241        self
242    }
243
244    pub fn idle_connection_timeout(mut self, dur: Duration) -> Self {
245        self.cfg.idle_connection_timeout = dur;
246        self
247    }
248
249    pub fn max_idle_connections(mut self, n: usize) -> Self {
250        self.cfg.max_idle_connections = n;
251        self
252    }
253
254    /// Validate and produce the final [`ClientConfig`].
255    ///
256    /// Returns an error if `host` or `database` were not set.
257    pub fn build(mut self) -> Result<ClientConfig, Error> {
258        if self.cfg.host.is_empty() {
259            return Err(Error::Config("host is required".into()));
260        }
261        Url::parse(&self.cfg.host)
262            .map_err(|e| Error::Config(format!("invalid host URL '{}': {e}", self.cfg.host)))?;
263        if self.cfg.database.is_empty() {
264            return Err(Error::Config("database is required".into()));
265        }
266
267        for (key, value) in self.pending_headers {
268            let name = HeaderName::from_bytes(key.as_bytes())
269                .map_err(|e| Error::Config(format!("invalid header name '{key}': {e}")))?;
270            let val = HeaderValue::from_str(&value)
271                .map_err(|e| Error::Config(format!("invalid value for header '{key}': {e}")))?;
272            self.cfg.headers.insert(name, val);
273        }
274
275        // Surface a malformed token now rather than on the first request.
276        self.cfg.authorization_header()?;
277
278        Ok(self.cfg)
279    }
280}