influxdb3_client/write.rs
1use std::collections::HashMap;
2
3use crate::{point::Point, precision::Precision};
4
5/// Options controlling a single write operation.
6///
7/// Defaults live in [`crate::ClientConfig::write_options`]; individual writes
8/// override values via the [`crate::Client::write`] builder.
9#[derive(Debug, Clone)]
10pub struct WriteOptions {
11 /// Timestamp precision for this write. Defaults to `Nanosecond`.
12 pub precision: Precision,
13
14 /// Tags merged into every point before serialisation.
15 /// Point-level tags take precedence on collision.
16 pub default_tags: HashMap<String, String>,
17
18 /// If `Some(n)`, compress the body with gzip when it exceeds `n` bytes.
19 /// `Some(0)` always compresses; `None` never compresses. Defaults to
20 /// `Some(1024)`.
21 ///
22 /// Compression trades CPU for bandwidth. The default suits remote/cloud
23 /// targets where bandwidth dominates. For high-throughput ingest over a
24 /// fast LAN (flight-test, IIoT), gzip CPU can become the bottleneck. Set
25 /// `gzip_threshold(None)` to disable it, or raise the threshold so only
26 /// large batches are compressed.
27 pub gzip_threshold: Option<usize>,
28
29 /// When `true`, skip WAL synchronisation (faster, lower durability).
30 pub no_sync: bool,
31
32 /// When `true`, a batch is accepted even if some lines are invalid.
33 pub accept_partial: bool,
34
35 /// When `true`, use the v2 (`/api/v2/write`) endpoint instead of v3.
36 pub use_v2_api: bool,
37
38 /// Optional tag ordering for deterministic line-protocol output.
39 pub tag_order: Vec<String>,
40
41 /// Maximum number of points per HTTP request when calling `write`.
42 /// Larger inputs are streamed as multiple sequential or pipelined requests.
43 /// Defaults to `5_000`.
44 ///
45 /// This is a point count, not a byte size. The effective ceiling is the
46 /// server's maximum request size (configurable on InfluxDB, 10 MB by
47 /// default); if you raise `batch_size` into a `413`, raise that limit too.
48 pub batch_size: usize,
49
50 /// Maximum number of concurrent in-flight HTTP requests when writing
51 /// multiple batches. Defaults to `4`. Set to `1` for strict ordering.
52 pub max_inflight: usize,
53}
54
55/// Default maximum number of points per write request.
56pub const DEFAULT_BATCH_SIZE: usize = 5_000;
57
58/// Default maximum number of concurrent in-flight HTTP write requests.
59pub const DEFAULT_MAX_INFLIGHT: usize = 4;
60
61impl Default for WriteOptions {
62 fn default() -> Self {
63 WriteOptions {
64 precision: Precision::Nanosecond,
65 default_tags: HashMap::new(),
66 gzip_threshold: Some(1024),
67 no_sync: false,
68 accept_partial: true,
69 use_v2_api: false,
70 tag_order: Vec::new(),
71 batch_size: DEFAULT_BATCH_SIZE,
72 max_inflight: DEFAULT_MAX_INFLIGHT,
73 }
74 }
75}
76
77/// A type that can be lazily serialised to InfluxDB line protocol for writing.
78///
79/// Pass anything that implements this trait to [`crate::Client::write`].
80///
81/// | Type | Use case |
82/// |---|---|
83/// | `&str` / `String` | Pre-formatted line protocol (low-level escape hatch) |
84/// | `Vec<Point>` | Point builder API (pass ownership; clone a slice with `.to_vec()` if you must keep it) |
85/// | [`crate::write_dataframe::DataFrameWrite`] | polars DataFrame (`polars` feature) |
86///
87/// Implementations return an iterator that yields **one batch per HTTP
88/// request**. The iterator is consumed lazily, so only one batch buffer lives
89/// in memory at a time even for million-point writes.
90pub trait WriteInput {
91 /// Lazily produce line-protocol batches, one per HTTP request.
92 ///
93 /// Implementations should respect `opts.batch_size`. Errors per batch are
94 /// returned in the iterator so partially-valid inputs can still send what
95 /// they can.
96 fn into_lp_batches(
97 self,
98 opts: &WriteOptions,
99 ) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send>;
100}
101
102impl WriteInput for &str {
103 fn into_lp_batches(
104 self,
105 _opts: &WriteOptions,
106 ) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
107 if self.is_empty() {
108 Box::new(std::iter::empty())
109 } else {
110 Box::new(std::iter::once(Ok(self.as_bytes().to_vec())))
111 }
112 }
113}
114
115impl WriteInput for String {
116 fn into_lp_batches(
117 self,
118 _opts: &WriteOptions,
119 ) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
120 if self.is_empty() {
121 Box::new(std::iter::empty())
122 } else {
123 Box::new(std::iter::once(Ok(self.into_bytes())))
124 }
125 }
126}
127
128/// Lazy iterator that serialises chunks of points into LP buffers on demand.
129pub(crate) struct PointBatchIter {
130 points: Vec<Point>,
131 idx: usize,
132 batch_size: usize,
133 precision: Precision,
134 default_tags: HashMap<String, String>,
135 tag_order: Vec<String>,
136}
137
138impl Iterator for PointBatchIter {
139 type Item = crate::Result<Vec<u8>>;
140
141 fn next(&mut self) -> Option<Self::Item> {
142 if self.idx >= self.points.len() {
143 return None;
144 }
145 let end = (self.idx + self.batch_size).min(self.points.len());
146 // Pre-size at roughly 64 bytes per point.
147 let mut buf = Vec::with_capacity((end - self.idx) * 64);
148 let tag_order = if self.tag_order.is_empty() {
149 None
150 } else {
151 Some(self.tag_order.as_slice())
152 };
153 // One scratch buffer reused for every point in the batch.
154 let mut key_scratch = Vec::new();
155 for point in &self.points[self.idx..end] {
156 if let Err(e) = point.write_line_protocol(
157 &mut buf,
158 self.precision,
159 &self.default_tags,
160 tag_order,
161 &mut key_scratch,
162 ) {
163 self.idx = self.points.len(); // stop iteration after error
164 return Some(Err(e));
165 }
166 buf.push(b'\n');
167 }
168 // Drop the trailing newline.
169 if buf.last() == Some(&b'\n') {
170 buf.pop();
171 }
172 self.idx = end;
173 Some(Ok(buf))
174 }
175}
176
177impl WriteInput for Vec<Point> {
178 fn into_lp_batches(
179 self,
180 opts: &WriteOptions,
181 ) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
182 Box::new(PointBatchIter {
183 points: self,
184 idx: 0,
185 batch_size: opts.batch_size.max(1),
186 precision: opts.precision,
187 default_tags: opts.default_tags.clone(),
188 tag_order: opts.tag_order.clone(),
189 })
190 }
191}