influxdb_stream/client.rs
1//! InfluxDB streaming client.
2//!
3//! This module provides the main `Client` type for executing streaming queries
4//! against an InfluxDB 2.x server.
5
6use std::pin::Pin;
7
8use async_stream::stream;
9use futures::{Stream, StreamExt, TryStreamExt};
10use reqwest::{Method, Url};
11use serde::Serialize;
12use tokio_util::io::StreamReader;
13
14use crate::error::Result;
15use crate::parser::AnnotatedCsvParser;
16use crate::types::FluxRecord;
17
18/// InfluxDB 2.x streaming client.
19///
20/// This client executes Flux queries and returns results as an async stream,
21/// allowing you to process millions of rows without loading them all into memory.
22///
23/// # Example
24///
25/// ```ignore
26/// use influxdb_stream::Client;
27/// use futures::StreamExt;
28///
29/// #[tokio::main]
30/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
31/// let client = Client::new("http://localhost:8086", "my-org", "my-token");
32///
33/// let mut stream = client.query_stream(r#"
34/// from(bucket: "sensors")
35/// |> range(start: -1h)
36/// |> filter(fn: (r) => r._measurement == "temperature")
37/// "#).await?;
38///
39/// while let Some(record) = stream.next().await {
40/// let record = record?;
41/// println!("Got: {:?}", record);
42/// }
43///
44/// Ok(())
45/// }
46/// ```
47#[derive(Clone)]
48pub struct Client {
49 http: reqwest::Client,
50 base_url: Url,
51 org: String,
52 token: String,
53}
54
55/// Query payload for the InfluxDB API.
56#[derive(Debug, Serialize)]
57struct QueryPayload {
58 query: String,
59 #[serde(rename = "type")]
60 query_type: String,
61 dialect: QueryDialect,
62}
63
64/// CSV dialect settings for query responses.
65#[derive(Debug, Serialize)]
66struct QueryDialect {
67 annotations: Vec<String>,
68 #[serde(rename = "commentPrefix")]
69 comment_prefix: String,
70 #[serde(rename = "dateTimeFormat")]
71 date_time_format: String,
72 delimiter: String,
73 header: bool,
74}
75
76impl Default for QueryDialect {
77 fn default() -> Self {
78 Self {
79 annotations: vec![
80 "datatype".to_string(),
81 "group".to_string(),
82 "default".to_string(),
83 ],
84 comment_prefix: "#".to_string(),
85 date_time_format: "RFC3339".to_string(),
86 delimiter: ",".to_string(),
87 header: true,
88 }
89 }
90}
91
92impl QueryPayload {
93 fn new(query: impl Into<String>) -> Self {
94 Self {
95 query: query.into(),
96 query_type: "flux".to_string(),
97 dialect: QueryDialect::default(),
98 }
99 }
100}
101
102impl Client {
103 /// Create a new InfluxDB client.
104 ///
105 /// # Arguments
106 ///
107 /// * `url` - Base URL of the InfluxDB server (e.g., "http://localhost:8086")
108 /// * `org` - Organization name
109 /// * `token` - Authentication token
110 ///
111 /// # Panics
112 ///
113 /// Panics if the provided URL is invalid.
114 pub fn new(url: impl Into<String>, org: impl Into<String>, token: impl Into<String>) -> Self {
115 let url_str = url.into();
116 let base_url = Url::parse(&url_str)
117 .unwrap_or_else(|e| panic!("Invalid InfluxDB URL '{}': {}", url_str, e));
118
119 Self {
120 http: reqwest::Client::new(),
121 base_url,
122 org: org.into(),
123 token: token.into(),
124 }
125 }
126
127 /// Create a new client with a custom reqwest client.
128 ///
129 /// This allows you to configure timeouts, proxies, TLS settings, etc.
130 pub fn with_http_client(
131 http: reqwest::Client,
132 url: impl Into<String>,
133 org: impl Into<String>,
134 token: impl Into<String>,
135 ) -> Self {
136 let url_str = url.into();
137 let base_url = Url::parse(&url_str)
138 .unwrap_or_else(|e| panic!("Invalid InfluxDB URL '{}': {}", url_str, e));
139
140 Self {
141 http,
142 base_url,
143 org: org.into(),
144 token: token.into(),
145 }
146 }
147
148 /// Get the base URL.
149 pub fn url(&self) -> &Url {
150 &self.base_url
151 }
152
153 /// Get the organization name.
154 pub fn org(&self) -> &str {
155 &self.org
156 }
157
158 /// Build the full URL for an API endpoint.
159 fn endpoint(&self, path: &str) -> String {
160 let mut url = self.base_url.clone();
161 url.set_path(path);
162 url.to_string()
163 }
164
165 /// Execute a Flux query and return results as an async stream.
166 ///
167 /// This is the primary method for querying InfluxDB. Results are streamed
168 /// one record at a time, so you can process arbitrarily large result sets
169 /// without running out of memory.
170 ///
171 /// # Arguments
172 ///
173 /// * `query` - Flux query string
174 ///
175 /// # Returns
176 ///
177 /// A stream of `Result<FluxRecord>`. Each item is either a successfully
178 /// parsed record or an error.
179 ///
180 /// # Example
181 ///
182 /// ```ignore
183 /// use futures::StreamExt;
184 ///
185 /// let mut stream = client.query_stream("from(bucket: \"test\") |> range(start: -1h)").await?;
186 ///
187 /// let mut count = 0;
188 /// while let Some(result) = stream.next().await {
189 /// let record = result?;
190 /// count += 1;
191 /// }
192 /// println!("Processed {} records", count);
193 /// ```
194 pub async fn query_stream(
195 &self,
196 query: impl Into<String>,
197 ) -> Result<Pin<Box<dyn Stream<Item = Result<FluxRecord>> + Send>>> {
198 let endpoint = self.endpoint("/api/v2/query");
199 let payload = QueryPayload::new(query);
200 let body = serde_json::to_string(&payload)?;
201
202 let response = self
203 .http
204 .request(Method::POST, &endpoint)
205 .header("Authorization", format!("Token {}", self.token))
206 .header("Accept", "application/csv")
207 .header("Content-Type", "application/json")
208 .query(&[("org", &self.org)])
209 .body(body)
210 .send()
211 .await?
212 .error_for_status()?;
213
214 // Convert the response body to an async reader
215 let reader = StreamReader::new(response.bytes_stream().map_err(std::io::Error::other));
216
217 let mut parser = AnnotatedCsvParser::new(reader);
218
219 // Create an async stream that yields records
220 let s = stream! {
221 loop {
222 match parser.next().await {
223 Ok(Some(record)) => yield Ok(record),
224 Ok(None) => break, // EOF
225 Err(e) => {
226 yield Err(e);
227 break;
228 }
229 }
230 }
231 };
232
233 Ok(Box::pin(s))
234 }
235
236 /// Execute a Flux query and collect all results into a Vec.
237 ///
238 /// **Warning**: This loads all results into memory. For large result sets,
239 /// use `query_stream()` instead to process records one at a time.
240 ///
241 /// # Arguments
242 ///
243 /// * `query` - Flux query string
244 ///
245 /// # Returns
246 ///
247 /// A vector of all records from the query.
248 pub async fn query(&self, query: impl Into<String>) -> Result<Vec<FluxRecord>> {
249 let mut stream = self.query_stream(query).await?;
250 let mut results = Vec::new();
251
252 while let Some(item) = stream.next().await {
253 results.push(item?);
254 }
255
256 Ok(results)
257 }
258}