influxdb_stream/
client.rs

1//! InfluxDB streaming client.
2//!
3//! This module provides the main `Client` type for executing streaming queries
4//! against an InfluxDB 2.x server.
5
6use std::pin::Pin;
7
8use async_stream::stream;
9use futures::{Stream, StreamExt, TryStreamExt};
10use reqwest::{Method, Url};
11use serde::Serialize;
12use tokio_util::io::StreamReader;
13
14use crate::error::Result;
15use crate::parser::AnnotatedCsvParser;
16use crate::types::FluxRecord;
17
18/// InfluxDB 2.x streaming client.
19///
20/// This client executes Flux queries and returns results as an async stream,
21/// allowing you to process millions of rows without loading them all into memory.
22///
23/// # Example
24///
25/// ```ignore
26/// use influxdb_stream::Client;
27/// use futures::StreamExt;
28///
29/// #[tokio::main]
30/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
31///     let client = Client::new("http://localhost:8086", "my-org", "my-token");
32///
33///     let mut stream = client.query_stream(r#"
34///         from(bucket: "sensors")
35///         |> range(start: -1h)
36///         |> filter(fn: (r) => r._measurement == "temperature")
37///     "#).await?;
38///
39///     while let Some(record) = stream.next().await {
40///         let record = record?;
41///         println!("Got: {:?}", record);
42///     }
43///
44///     Ok(())
45/// }
46/// ```
47#[derive(Clone)]
48pub struct Client {
49    http: reqwest::Client,
50    base_url: Url,
51    org: String,
52    token: String,
53}
54
55/// Query payload for the InfluxDB API.
56#[derive(Debug, Serialize)]
57struct QueryPayload {
58    query: String,
59    #[serde(rename = "type")]
60    query_type: String,
61    dialect: QueryDialect,
62}
63
64/// CSV dialect settings for query responses.
65#[derive(Debug, Serialize)]
66struct QueryDialect {
67    annotations: Vec<String>,
68    #[serde(rename = "commentPrefix")]
69    comment_prefix: String,
70    #[serde(rename = "dateTimeFormat")]
71    date_time_format: String,
72    delimiter: String,
73    header: bool,
74}
75
76impl Default for QueryDialect {
77    fn default() -> Self {
78        Self {
79            annotations: vec![
80                "datatype".to_string(),
81                "group".to_string(),
82                "default".to_string(),
83            ],
84            comment_prefix: "#".to_string(),
85            date_time_format: "RFC3339".to_string(),
86            delimiter: ",".to_string(),
87            header: true,
88        }
89    }
90}
91
92impl QueryPayload {
93    fn new(query: impl Into<String>) -> Self {
94        Self {
95            query: query.into(),
96            query_type: "flux".to_string(),
97            dialect: QueryDialect::default(),
98        }
99    }
100}
101
102impl Client {
103    /// Create a new InfluxDB client.
104    ///
105    /// # Arguments
106    ///
107    /// * `url` - Base URL of the InfluxDB server (e.g., "http://localhost:8086")
108    /// * `org` - Organization name
109    /// * `token` - Authentication token
110    ///
111    /// # Panics
112    ///
113    /// Panics if the provided URL is invalid.
114    pub fn new(url: impl Into<String>, org: impl Into<String>, token: impl Into<String>) -> Self {
115        let url_str = url.into();
116        let base_url = Url::parse(&url_str)
117            .unwrap_or_else(|e| panic!("Invalid InfluxDB URL '{}': {}", url_str, e));
118
119        Self {
120            http: reqwest::Client::new(),
121            base_url,
122            org: org.into(),
123            token: token.into(),
124        }
125    }
126
127    /// Create a new client with a custom reqwest client.
128    ///
129    /// This allows you to configure timeouts, proxies, TLS settings, etc.
130    pub fn with_http_client(
131        http: reqwest::Client,
132        url: impl Into<String>,
133        org: impl Into<String>,
134        token: impl Into<String>,
135    ) -> Self {
136        let url_str = url.into();
137        let base_url = Url::parse(&url_str)
138            .unwrap_or_else(|e| panic!("Invalid InfluxDB URL '{}': {}", url_str, e));
139
140        Self {
141            http,
142            base_url,
143            org: org.into(),
144            token: token.into(),
145        }
146    }
147
148    /// Get the base URL.
149    pub fn url(&self) -> &Url {
150        &self.base_url
151    }
152
153    /// Get the organization name.
154    pub fn org(&self) -> &str {
155        &self.org
156    }
157
158    /// Build the full URL for an API endpoint.
159    fn endpoint(&self, path: &str) -> String {
160        let mut url = self.base_url.clone();
161        url.set_path(path);
162        url.to_string()
163    }
164
165    /// Execute a Flux query and return results as an async stream.
166    ///
167    /// This is the primary method for querying InfluxDB. Results are streamed
168    /// one record at a time, so you can process arbitrarily large result sets
169    /// without running out of memory.
170    ///
171    /// # Arguments
172    ///
173    /// * `query` - Flux query string
174    ///
175    /// # Returns
176    ///
177    /// A stream of `Result<FluxRecord>`. Each item is either a successfully
178    /// parsed record or an error.
179    ///
180    /// # Example
181    ///
182    /// ```ignore
183    /// use futures::StreamExt;
184    ///
185    /// let mut stream = client.query_stream("from(bucket: \"test\") |> range(start: -1h)").await?;
186    ///
187    /// let mut count = 0;
188    /// while let Some(result) = stream.next().await {
189    ///     let record = result?;
190    ///     count += 1;
191    /// }
192    /// println!("Processed {} records", count);
193    /// ```
194    pub async fn query_stream(
195        &self,
196        query: impl Into<String>,
197    ) -> Result<Pin<Box<dyn Stream<Item = Result<FluxRecord>> + Send>>> {
198        let endpoint = self.endpoint("/api/v2/query");
199        let payload = QueryPayload::new(query);
200        let body = serde_json::to_string(&payload)?;
201
202        let response = self
203            .http
204            .request(Method::POST, &endpoint)
205            .header("Authorization", format!("Token {}", self.token))
206            .header("Accept", "application/csv")
207            .header("Content-Type", "application/json")
208            .query(&[("org", &self.org)])
209            .body(body)
210            .send()
211            .await?
212            .error_for_status()?;
213
214        // Convert the response body to an async reader
215        let reader = StreamReader::new(response.bytes_stream().map_err(std::io::Error::other));
216
217        let mut parser = AnnotatedCsvParser::new(reader);
218
219        // Create an async stream that yields records
220        let s = stream! {
221            loop {
222                match parser.next().await {
223                    Ok(Some(record)) => yield Ok(record),
224                    Ok(None) => break,       // EOF
225                    Err(e) => {
226                        yield Err(e);
227                        break;
228                    }
229                }
230            }
231        };
232
233        Ok(Box::pin(s))
234    }
235
236    /// Execute a Flux query and collect all results into a Vec.
237    ///
238    /// **Warning**: This loads all results into memory. For large result sets,
239    /// use `query_stream()` instead to process records one at a time.
240    ///
241    /// # Arguments
242    ///
243    /// * `query` - Flux query string
244    ///
245    /// # Returns
246    ///
247    /// A vector of all records from the query.
248    pub async fn query(&self, query: impl Into<String>) -> Result<Vec<FluxRecord>> {
249        let mut stream = self.query_stream(query).await?;
250        let mut results = Vec::new();
251
252        while let Some(item) = stream.next().await {
253            results.push(item?);
254        }
255
256        Ok(results)
257    }
258}