Skip to main content

influxdb3_client/
write.rs

1use std::collections::HashMap;
2
3use crate::{point::Point, precision::Precision};
4
5/// Options controlling a single write operation.
6///
7/// Defaults live in [`crate::ClientConfig::write_options`]; individual writes
8/// override values via the [`crate::Client::write`] builder.
9#[derive(Debug, Clone)]
10pub struct WriteOptions {
11    /// Timestamp precision for this write.  Defaults to `Nanosecond`.
12    pub precision: Precision,
13
14    /// Tags merged into every point before serialisation.
15    /// Point-level tags take precedence on collision.
16    pub default_tags: HashMap<String, String>,
17
18    /// If `Some(n)`, compress the body with gzip when it exceeds `n` bytes.
19    /// `Some(0)` always compresses; `None` never compresses. Defaults to
20    /// `Some(1024)`.
21    ///
22    /// Compression trades CPU for bandwidth. The default suits remote/cloud
23    /// targets where bandwidth dominates. For high-throughput ingest over a
24    /// fast LAN (flight-test, IIoT), gzip CPU can become the bottleneck. Set
25    /// `gzip_threshold(None)` to disable it, or raise the threshold so only
26    /// large batches are compressed.
27    pub gzip_threshold: Option<usize>,
28
29    /// When `true`, skip WAL synchronisation (faster, lower durability).
30    pub no_sync: bool,
31
32    /// When `true`, a batch is accepted even if some lines are invalid.
33    pub accept_partial: bool,
34
35    /// When `true`, use the v2 (`/api/v2/write`) endpoint instead of v3.
36    pub use_v2_api: bool,
37
38    /// Optional tag ordering for deterministic line-protocol output.
39    pub tag_order: Vec<String>,
40
41    /// Maximum number of points per HTTP request when calling `write`.
42    /// Larger inputs are streamed as multiple sequential or pipelined requests.
43    /// Defaults to `5_000`.
44    ///
45    /// This is a point count, not a byte size. The effective ceiling is the
46    /// server's maximum request size (configurable on InfluxDB, 10 MB by
47    /// default); if you raise `batch_size` into a `413`, raise that limit too.
48    pub batch_size: usize,
49
50    /// Maximum number of concurrent in-flight HTTP requests when writing
51    /// multiple batches.  Defaults to `4`.  Set to `1` for strict ordering.
52    pub max_inflight: usize,
53}
54
55/// Default maximum number of points per write request.
56pub const DEFAULT_BATCH_SIZE: usize = 5_000;
57
58/// Default maximum number of concurrent in-flight HTTP write requests.
59pub const DEFAULT_MAX_INFLIGHT: usize = 4;
60
61impl Default for WriteOptions {
62    fn default() -> Self {
63        WriteOptions {
64            precision: Precision::Nanosecond,
65            default_tags: HashMap::new(),
66            gzip_threshold: Some(1024),
67            no_sync: false,
68            accept_partial: true,
69            use_v2_api: false,
70            tag_order: Vec::new(),
71            batch_size: DEFAULT_BATCH_SIZE,
72            max_inflight: DEFAULT_MAX_INFLIGHT,
73        }
74    }
75}
76
77/// A type that can be lazily serialised to InfluxDB line protocol for writing.
78///
79/// Pass anything that implements this trait to [`crate::Client::write`].
80///
81/// | Type | Use case |
82/// |---|---|
83/// | `&str` / `String` | Pre-formatted line protocol (low-level escape hatch) |
84/// | `Vec<Point>` | Point builder API (pass ownership; clone a slice with `.to_vec()` if you must keep it) |
85/// | [`crate::write_dataframe::DataFrameWrite`] | polars DataFrame (`polars` feature) |
86///
87/// Implementations return an iterator that yields **one batch per HTTP
88/// request**. The iterator is consumed lazily, so only one batch buffer lives
89/// in memory at a time even for million-point writes.
90pub trait WriteInput {
91    /// Lazily produce line-protocol batches, one per HTTP request.
92    ///
93    /// Implementations should respect `opts.batch_size`.  Errors per batch are
94    /// returned in the iterator so partially-valid inputs can still send what
95    /// they can.
96    fn into_lp_batches(
97        self,
98        opts: &WriteOptions,
99    ) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send>;
100}
101
102impl WriteInput for &str {
103    fn into_lp_batches(
104        self,
105        _opts: &WriteOptions,
106    ) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
107        if self.is_empty() {
108            Box::new(std::iter::empty())
109        } else {
110            Box::new(std::iter::once(Ok(self.as_bytes().to_vec())))
111        }
112    }
113}
114
115impl WriteInput for String {
116    fn into_lp_batches(
117        self,
118        _opts: &WriteOptions,
119    ) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
120        if self.is_empty() {
121            Box::new(std::iter::empty())
122        } else {
123            Box::new(std::iter::once(Ok(self.into_bytes())))
124        }
125    }
126}
127
128/// Lazy iterator that serialises chunks of points into LP buffers on demand.
129pub(crate) struct PointBatchIter {
130    points: Vec<Point>,
131    idx: usize,
132    batch_size: usize,
133    precision: Precision,
134    default_tags: HashMap<String, String>,
135    tag_order: Vec<String>,
136}
137
138impl Iterator for PointBatchIter {
139    type Item = crate::Result<Vec<u8>>;
140
141    fn next(&mut self) -> Option<Self::Item> {
142        if self.idx >= self.points.len() {
143            return None;
144        }
145        let end = (self.idx + self.batch_size).min(self.points.len());
146        // Pre-size at roughly 64 bytes per point.
147        let mut buf = Vec::with_capacity((end - self.idx) * 64);
148        let tag_order = if self.tag_order.is_empty() {
149            None
150        } else {
151            Some(self.tag_order.as_slice())
152        };
153        // One scratch buffer reused for every point in the batch.
154        let mut key_scratch = Vec::new();
155        for point in &self.points[self.idx..end] {
156            if let Err(e) = point.write_line_protocol(
157                &mut buf,
158                self.precision,
159                &self.default_tags,
160                tag_order,
161                &mut key_scratch,
162            ) {
163                self.idx = self.points.len(); // stop iteration after error
164                return Some(Err(e));
165            }
166            buf.push(b'\n');
167        }
168        // Drop the trailing newline.
169        if buf.last() == Some(&b'\n') {
170            buf.pop();
171        }
172        self.idx = end;
173        Some(Ok(buf))
174    }
175}
176
177impl WriteInput for Vec<Point> {
178    fn into_lp_batches(
179        self,
180        opts: &WriteOptions,
181    ) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
182        Box::new(PointBatchIter {
183            points: self,
184            idx: 0,
185            batch_size: opts.batch_size.max(1),
186            precision: opts.precision,
187            default_tags: opts.default_tags.clone(),
188            tag_order: opts.tag_order.clone(),
189        })
190    }
191}