Skip to main content

influxdb3_client/
point.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::fmt;
4
5use chrono::{DateTime, Utc};
6use indexmap::IndexMap;
7
8use crate::{error::Error, precision::Precision};
9
10/// A typed value that can be stored in an InfluxDB field.
11#[derive(Debug, Clone, PartialEq)]
12pub enum FieldValue {
13    Float(f64),
14    Integer(i64),
15    UInteger(u64),
16    String(String),
17    Boolean(bool),
18}
19
20impl FieldValue {
21    /// Write the value in line-protocol notation into `buf`.
22    pub(crate) fn write_lp(&self, buf: &mut Vec<u8>) {
23        match self {
24            FieldValue::Float(v) => {
25                if v.is_finite() {
26                    let mut ryu = ryu::Buffer::new();
27                    let s = ryu.format(*v);
28                    buf.extend_from_slice(s.as_bytes());
29                    // ryu always includes a decimal point for finite floats.
30                } else {
31                    // NaN / inf are not valid LP; emit the debug form so server
32                    // returns a clear per-line error rather than silent corruption.
33                    use std::io::Write;
34                    let _ = write!(buf, "{v}");
35                }
36            }
37            FieldValue::Integer(v) => {
38                let mut itoa_buf = itoa::Buffer::new();
39                buf.extend_from_slice(itoa_buf.format(*v).as_bytes());
40                buf.push(b'i');
41            }
42            FieldValue::UInteger(v) => {
43                let mut itoa_buf = itoa::Buffer::new();
44                buf.extend_from_slice(itoa_buf.format(*v).as_bytes());
45                buf.push(b'u');
46            }
47            FieldValue::String(v) => {
48                buf.push(b'"');
49                write_escaped_string_field(buf, v);
50                buf.push(b'"');
51            }
52            FieldValue::Boolean(v) => {
53                buf.extend_from_slice(if *v { b"true" } else { b"false" });
54            }
55        }
56    }
57}
58
59impl fmt::Display for FieldValue {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        match self {
62            FieldValue::Float(v) => write!(f, "{v}"),
63            FieldValue::Integer(v) => write!(f, "{v}"),
64            FieldValue::UInteger(v) => write!(f, "{v}"),
65            FieldValue::String(v) => f.write_str(v),
66            FieldValue::Boolean(v) => write!(f, "{v}"),
67        }
68    }
69}
70
71impl From<f64> for FieldValue {
72    fn from(v: f64) -> Self {
73        FieldValue::Float(v)
74    }
75}
76impl From<f32> for FieldValue {
77    fn from(v: f32) -> Self {
78        FieldValue::Float(v as f64)
79    }
80}
81impl From<i64> for FieldValue {
82    fn from(v: i64) -> Self {
83        FieldValue::Integer(v)
84    }
85}
86impl From<i32> for FieldValue {
87    fn from(v: i32) -> Self {
88        FieldValue::Integer(v as i64)
89    }
90}
91impl From<i16> for FieldValue {
92    fn from(v: i16) -> Self {
93        FieldValue::Integer(v as i64)
94    }
95}
96impl From<i8> for FieldValue {
97    fn from(v: i8) -> Self {
98        FieldValue::Integer(v as i64)
99    }
100}
101impl From<u64> for FieldValue {
102    fn from(v: u64) -> Self {
103        FieldValue::UInteger(v)
104    }
105}
106impl From<u32> for FieldValue {
107    fn from(v: u32) -> Self {
108        FieldValue::UInteger(v as u64)
109    }
110}
111impl From<u16> for FieldValue {
112    fn from(v: u16) -> Self {
113        FieldValue::UInteger(v as u64)
114    }
115}
116impl From<u8> for FieldValue {
117    fn from(v: u8) -> Self {
118        FieldValue::UInteger(v as u64)
119    }
120}
121impl From<bool> for FieldValue {
122    fn from(v: bool) -> Self {
123        FieldValue::Boolean(v)
124    }
125}
126impl From<String> for FieldValue {
127    fn from(v: String) -> Self {
128        FieldValue::String(v)
129    }
130}
131impl From<&str> for FieldValue {
132    fn from(v: &str) -> Self {
133        FieldValue::String(v.to_owned())
134    }
135}
136
137/// A single time-series data point ready to be written to InfluxDB 3.
138///
139/// Tags and fields use [`IndexMap`] internally so `tag()` / `field()` dedupe in
140/// O(1) regardless of point width, so wide points (hundreds-to-thousands of
141/// fields, typical for flight-test telemetry and IIoT4.0 PLC data) build in
142/// linear time.
143#[derive(Debug, Clone, Default)]
144pub struct Point {
145    pub(crate) measurement: String,
146    pub(crate) tags: IndexMap<String, String>,
147    pub(crate) fields: IndexMap<String, FieldValue>,
148    pub(crate) timestamp: Option<i64>,
149}
150
151impl Point {
152    /// Create a new point with the given measurement name.
153    pub fn new(measurement: impl Into<String>) -> Self {
154        Point {
155            measurement: measurement.into(),
156            ..Default::default()
157        }
158    }
159
160    /// Pre-allocate space for `n` fields.  Useful when building wide points
161    /// (hundreds of fields per point) where you know the field count up front.
162    pub fn with_capacity(measurement: impl Into<String>, n_fields: usize) -> Self {
163        Point {
164            measurement: measurement.into(),
165            tags: IndexMap::new(),
166            fields: IndexMap::with_capacity(n_fields),
167            timestamp: None,
168        }
169    }
170
171    /// Add or update a tag.  O(1) deduplication.
172    pub fn tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
173        self.tags.insert(key.into(), value.into());
174        self
175    }
176
177    /// Add or update a field.  O(1) deduplication.
178    pub fn field(mut self, key: impl Into<String>, value: impl Into<FieldValue>) -> Self {
179        self.fields.insert(key.into(), value.into());
180        self
181    }
182
183    /// Set the timestamp as nanoseconds since the Unix epoch.
184    pub fn timestamp_nanos(mut self, nanos: i64) -> Self {
185        self.timestamp = Some(nanos);
186        self
187    }
188
189    /// Set the timestamp from a [`DateTime<Utc>`].
190    pub fn timestamp_datetime(mut self, dt: DateTime<Utc>) -> Self {
191        self.timestamp = dt.timestamp_nanos_opt();
192        self
193    }
194
195    pub fn measurement(&self) -> &str {
196        &self.measurement
197    }
198    pub fn tags(&self) -> &IndexMap<String, String> {
199        &self.tags
200    }
201    pub fn fields(&self) -> &IndexMap<String, FieldValue> {
202        &self.fields
203    }
204    pub fn timestamp(&self) -> Option<i64> {
205        self.timestamp
206    }
207
208    /// Serialise the point to InfluxDB line protocol with the given precision.
209    ///
210    /// Returns an error if the point has no fields.
211    pub fn to_line_protocol(&self, precision: Precision) -> Result<String, Error> {
212        let mut buf = Vec::with_capacity(64);
213        let mut key_scratch = Vec::new();
214        self.write_line_protocol(&mut buf, precision, &HashMap::new(), None, &mut key_scratch)?;
215        Ok(String::from_utf8(buf).expect("line protocol is valid UTF-8"))
216    }
217
218    /// Write the line-protocol representation directly into `buf`, serialising
219    /// straight into the batch buffer without intermediate allocations.
220    ///
221    /// `key_scratch` is a caller-owned buffer reused across points in a batch so
222    /// that sorting tag keys does not allocate per point; its contents are
223    /// overwritten on each call.
224    pub(crate) fn write_line_protocol<'p>(
225        &'p self,
226        buf: &mut Vec<u8>,
227        precision: Precision,
228        default_tags: &HashMap<String, String>,
229        tag_order: Option<&[String]>,
230        key_scratch: &mut Vec<&'p str>,
231    ) -> Result<(), Error> {
232        if self.fields.is_empty() {
233            return Err(Error::Config(format!(
234                "point '{}' has no fields; at least one field is required",
235                self.measurement
236            )));
237        }
238
239        // Measurement
240        write_escaped_measurement(buf, &self.measurement);
241
242        // Tags: merge defaults with point tags (point tags win on conflict).
243        // Skip the merge entirely when there are no default tags (the common
244        // hot-path case): we already have a deduplicated IndexMap.
245        if default_tags.is_empty() && tag_order.is_none() {
246            if !self.tags.is_empty() {
247                key_scratch.clear();
248                key_scratch.extend(self.tags.keys().map(String::as_str));
249                key_scratch.sort_unstable();
250                for &k in key_scratch.iter() {
251                    buf.push(b',');
252                    write_escaped_tag_key(buf, k);
253                    buf.push(b'=');
254                    write_escaped_tag_value(buf, &self.tags[k]);
255                }
256            }
257        } else {
258            // Merge path: only walked when default_tags or tag_order is set.
259            let mut tag_map: HashMap<&str, &str> =
260                HashMap::with_capacity(default_tags.len() + self.tags.len());
261            for (k, v) in default_tags {
262                tag_map.insert(k.as_str(), v.as_str());
263            }
264            for (k, v) in &self.tags {
265                tag_map.insert(k.as_str(), v.as_str());
266            }
267
268            if !tag_map.is_empty() {
269                let ordered_keys: Vec<&str> = if let Some(order) = tag_order {
270                    let mut ordered: Vec<&str> = order
271                        .iter()
272                        .filter(|k| tag_map.contains_key(k.as_str()))
273                        .map(|k| k.as_str())
274                        .collect();
275                    let mut rest: Vec<&str> = tag_map
276                        .keys()
277                        .copied()
278                        .filter(|k| !order.iter().any(|o| o.as_str() == *k))
279                        .collect();
280                    rest.sort_unstable();
281                    ordered.extend(rest);
282                    ordered
283                } else {
284                    let mut keys: Vec<&str> = tag_map.keys().copied().collect();
285                    keys.sort_unstable();
286                    keys
287                };
288
289                for k in ordered_keys {
290                    buf.push(b',');
291                    write_escaped_tag_key(buf, k);
292                    buf.push(b'=');
293                    write_escaped_tag_value(buf, tag_map[k]);
294                }
295            }
296        }
297
298        // Fields
299        buf.push(b' ');
300        let mut first = true;
301        for (k, v) in &self.fields {
302            if !first {
303                buf.push(b',');
304            }
305            first = false;
306            write_escaped_tag_key(buf, k); // same escape rules as tag keys
307            buf.push(b'=');
308            v.write_lp(buf);
309        }
310
311        // Timestamp
312        if let Some(ts) = self.timestamp {
313            buf.push(b' ');
314            let mut itoa_buf = itoa::Buffer::new();
315            buf.extend_from_slice(itoa_buf.format(precision.scale_timestamp(ts)).as_bytes());
316        }
317
318        Ok(())
319    }
320}
321
322// See: https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/
323
324/// Returns `Cow::Borrowed` when no escaping is required, avoiding allocation on
325/// the common path where measurement/tag/field names are safe identifiers.
326fn escape_with(input: &str, needs_escape: fn(u8) -> bool) -> Cow<'_, str> {
327    if !input.bytes().any(needs_escape) {
328        return Cow::Borrowed(input);
329    }
330    let mut out = String::with_capacity(input.len() + 8);
331    for ch in input.chars() {
332        if ch.is_ascii() && needs_escape(ch as u8) {
333            out.push('\\');
334        }
335        out.push(ch);
336    }
337    Cow::Owned(out)
338}
339
340fn measurement_needs_escape(b: u8) -> bool {
341    matches!(b, b',' | b' ')
342}
343
344fn tag_needs_escape(b: u8) -> bool {
345    matches!(b, b',' | b'=' | b' ')
346}
347
348/// Escape a measurement name (commas and spaces). Shared with the DataFrame
349/// writer so both paths use the same rules.
350pub(crate) fn escape_measurement(s: &str) -> Cow<'_, str> {
351    escape_with(s, measurement_needs_escape)
352}
353
354/// Escape a tag key, tag value, or field key (commas, equals, spaces).
355pub(crate) fn escape_tag(s: &str) -> Cow<'_, str> {
356    escape_with(s, tag_needs_escape)
357}
358
359/// Escape the contents of a string field (backslash and double-quote). The
360/// caller is responsible for the surrounding quotes.
361pub(crate) fn escape_string_field(s: &str) -> Cow<'_, str> {
362    if !s.bytes().any(|b| b == b'\\' || b == b'"') {
363        return Cow::Borrowed(s);
364    }
365    let mut out = String::with_capacity(s.len() + 8);
366    for ch in s.chars() {
367        if ch == '\\' || ch == '"' {
368            out.push('\\');
369        }
370        out.push(ch);
371    }
372    Cow::Owned(out)
373}
374
375fn write_escaped_measurement(buf: &mut Vec<u8>, s: &str) {
376    buf.extend_from_slice(escape_measurement(s).as_bytes());
377}
378
379fn write_escaped_tag_key(buf: &mut Vec<u8>, s: &str) {
380    buf.extend_from_slice(escape_tag(s).as_bytes());
381}
382
383fn write_escaped_tag_value(buf: &mut Vec<u8>, s: &str) {
384    write_escaped_tag_key(buf, s); // same rules
385}
386
387fn write_escaped_string_field(buf: &mut Vec<u8>, s: &str) {
388    buf.extend_from_slice(escape_string_field(s).as_bytes());
389}