influxdb3_client/
config.rs1use std::time::Duration;
2
3use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
4use url::Url;
5
6use crate::{error::Error, retry::RetryConfig, write::WriteOptions};
7
8#[derive(Debug, Clone)]
14pub struct ClientConfig {
15 pub host: String,
17
18 pub token: Option<String>,
20
21 pub auth_scheme: String,
23
24 pub database: String,
26
27 pub org: Option<String>,
29
30 pub write_options: WriteOptions,
32
33 pub retry: RetryConfig,
36
37 pub headers: HeaderMap,
39
40 pub ssl_roots_path: Option<String>,
42
43 pub proxy: Option<String>,
45
46 pub write_timeout: Duration,
48
49 pub query_timeout: Duration,
52
53 pub idle_connection_timeout: Duration,
55
56 pub max_idle_connections: usize,
58}
59
60impl Default for ClientConfig {
61 fn default() -> Self {
62 ClientConfig {
63 host: String::new(),
64 token: None,
65 auth_scheme: "Bearer".to_string(),
66 database: String::new(), org: None,
68 write_options: WriteOptions::default(),
69 retry: RetryConfig::default(),
70 headers: HeaderMap::new(),
71 ssl_roots_path: None,
72 proxy: None,
73 write_timeout: Duration::from_secs(30),
74 query_timeout: Duration::from_secs(60),
75 idle_connection_timeout: Duration::from_secs(90),
76 max_idle_connections: 100,
77 }
78 }
79}
80
81impl ClientConfig {
82 pub fn builder() -> ClientConfigBuilder {
84 ClientConfigBuilder::default()
85 }
86
87 pub fn from_env() -> Result<Self, Error> {
91 let host = std::env::var("INFLUX_HOST").map_err(|_| Error::EnvVar("INFLUX_HOST".into()))?;
92 let database = std::env::var("INFLUX_DATABASE")
93 .or_else(|_| std::env::var("INFLUX_BUCKET"))
94 .map_err(|_| Error::EnvVar("INFLUX_DATABASE".into()))?;
95
96 let token = std::env::var("INFLUX_TOKEN").ok();
97 let org = std::env::var("INFLUX_ORG").ok();
98
99 ClientConfig::builder()
100 .host(host)
101 .database(database)
102 .token_opt(token)
103 .org_opt(org)
104 .build()
105 }
106
107 pub fn from_connection_string(cs: &str) -> Result<Self, Error> {
115 let url = Url::parse(cs)?;
116 let host = format!("{}://{}", url.scheme(), url.host_str().unwrap_or_default());
117
118 let mut builder = ClientConfig::builder().host(host);
119
120 for (key, value) in url.query_pairs() {
121 match key.as_ref() {
122 "token" => {
123 builder = builder.token(value.into_owned());
124 }
125 "database" | "bucket" => {
126 builder = builder.database(value.into_owned());
127 }
128 "org" => {
129 builder = builder.org(value.into_owned());
130 }
131 _other => {}
132 }
133 }
134
135 builder.build()
136 }
137
138 pub fn host_url(&self) -> &str {
140 self.host.trim_end_matches('/')
141 }
142
143 pub fn authorization_header(&self) -> Result<Option<HeaderValue>, Error> {
148 match &self.token {
149 None => Ok(None),
150 Some(tok) => HeaderValue::from_str(&format!("{} {}", self.auth_scheme, tok))
151 .map(Some)
152 .map_err(|_| Error::Config("token contains invalid header characters".into())),
153 }
154 }
155}
156
157#[derive(Debug, Default)]
159pub struct ClientConfigBuilder {
160 cfg: ClientConfig,
161 pending_headers: Vec<(String, String)>,
164}
165
166impl ClientConfigBuilder {
167 pub fn host(mut self, host: impl Into<String>) -> Self {
169 self.cfg.host = host.into();
170 self
171 }
172
173 pub fn token(mut self, token: impl Into<String>) -> Self {
174 self.cfg.token = Some(token.into());
175 self
176 }
177
178 pub fn token_opt(mut self, token: Option<String>) -> Self {
179 self.cfg.token = token;
180 self
181 }
182
183 pub fn auth_scheme(mut self, scheme: impl Into<String>) -> Self {
185 self.cfg.auth_scheme = scheme.into();
186 self
187 }
188
189 pub fn database(mut self, db: impl Into<String>) -> Self {
190 self.cfg.database = db.into();
191 self
192 }
193
194 pub fn org(mut self, org: impl Into<String>) -> Self {
195 self.cfg.org = Some(org.into());
196 self
197 }
198
199 pub fn org_opt(mut self, org: Option<String>) -> Self {
200 self.cfg.org = org;
201 self
202 }
203
204 pub fn write_options(mut self, opts: WriteOptions) -> Self {
205 self.cfg.write_options = opts;
206 self
207 }
208
209 pub fn retry(mut self, retry: RetryConfig) -> Self {
211 self.cfg.retry = retry;
212 self
213 }
214
215 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
220 self.pending_headers.push((key.into(), value.into()));
221 self
222 }
223
224 pub fn ssl_roots_path(mut self, path: impl Into<String>) -> Self {
225 self.cfg.ssl_roots_path = Some(path.into());
226 self
227 }
228
229 pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
230 self.cfg.proxy = Some(proxy.into());
231 self
232 }
233
234 pub fn write_timeout(mut self, dur: Duration) -> Self {
235 self.cfg.write_timeout = dur;
236 self
237 }
238
239 pub fn query_timeout(mut self, dur: Duration) -> Self {
240 self.cfg.query_timeout = dur;
241 self
242 }
243
244 pub fn idle_connection_timeout(mut self, dur: Duration) -> Self {
245 self.cfg.idle_connection_timeout = dur;
246 self
247 }
248
249 pub fn max_idle_connections(mut self, n: usize) -> Self {
250 self.cfg.max_idle_connections = n;
251 self
252 }
253
254 pub fn build(mut self) -> Result<ClientConfig, Error> {
258 if self.cfg.host.is_empty() {
259 return Err(Error::Config("host is required".into()));
260 }
261 Url::parse(&self.cfg.host)
262 .map_err(|e| Error::Config(format!("invalid host URL '{}': {e}", self.cfg.host)))?;
263 if self.cfg.database.is_empty() {
264 return Err(Error::Config("database is required".into()));
265 }
266
267 for (key, value) in self.pending_headers {
268 let name = HeaderName::from_bytes(key.as_bytes())
269 .map_err(|e| Error::Config(format!("invalid header name '{key}': {e}")))?;
270 let val = HeaderValue::from_str(&value)
271 .map_err(|e| Error::Config(format!("invalid value for header '{key}': {e}")))?;
272 self.cfg.headers.insert(name, val);
273 }
274
275 self.cfg.authorization_header()?;
277
278 Ok(self.cfg)
279 }
280}