1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::fmt;
4
5use chrono::{DateTime, Utc};
6use indexmap::IndexMap;
7
8use crate::{error::Error, precision::Precision};
9
10#[derive(Debug, Clone, PartialEq)]
12pub enum FieldValue {
13 Float(f64),
14 Integer(i64),
15 UInteger(u64),
16 String(String),
17 Boolean(bool),
18}
19
20impl FieldValue {
21 pub(crate) fn write_lp(&self, buf: &mut Vec<u8>) {
23 match self {
24 FieldValue::Float(v) => {
25 if v.is_finite() {
26 let mut ryu = ryu::Buffer::new();
27 let s = ryu.format(*v);
28 buf.extend_from_slice(s.as_bytes());
29 } else {
31 use std::io::Write;
34 let _ = write!(buf, "{v}");
35 }
36 }
37 FieldValue::Integer(v) => {
38 let mut itoa_buf = itoa::Buffer::new();
39 buf.extend_from_slice(itoa_buf.format(*v).as_bytes());
40 buf.push(b'i');
41 }
42 FieldValue::UInteger(v) => {
43 let mut itoa_buf = itoa::Buffer::new();
44 buf.extend_from_slice(itoa_buf.format(*v).as_bytes());
45 buf.push(b'u');
46 }
47 FieldValue::String(v) => {
48 buf.push(b'"');
49 write_escaped_string_field(buf, v);
50 buf.push(b'"');
51 }
52 FieldValue::Boolean(v) => {
53 buf.extend_from_slice(if *v { b"true" } else { b"false" });
54 }
55 }
56 }
57}
58
59impl fmt::Display for FieldValue {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 match self {
62 FieldValue::Float(v) => write!(f, "{v}"),
63 FieldValue::Integer(v) => write!(f, "{v}"),
64 FieldValue::UInteger(v) => write!(f, "{v}"),
65 FieldValue::String(v) => f.write_str(v),
66 FieldValue::Boolean(v) => write!(f, "{v}"),
67 }
68 }
69}
70
71impl From<f64> for FieldValue {
72 fn from(v: f64) -> Self {
73 FieldValue::Float(v)
74 }
75}
76impl From<f32> for FieldValue {
77 fn from(v: f32) -> Self {
78 FieldValue::Float(v as f64)
79 }
80}
81impl From<i64> for FieldValue {
82 fn from(v: i64) -> Self {
83 FieldValue::Integer(v)
84 }
85}
86impl From<i32> for FieldValue {
87 fn from(v: i32) -> Self {
88 FieldValue::Integer(v as i64)
89 }
90}
91impl From<i16> for FieldValue {
92 fn from(v: i16) -> Self {
93 FieldValue::Integer(v as i64)
94 }
95}
96impl From<i8> for FieldValue {
97 fn from(v: i8) -> Self {
98 FieldValue::Integer(v as i64)
99 }
100}
101impl From<u64> for FieldValue {
102 fn from(v: u64) -> Self {
103 FieldValue::UInteger(v)
104 }
105}
106impl From<u32> for FieldValue {
107 fn from(v: u32) -> Self {
108 FieldValue::UInteger(v as u64)
109 }
110}
111impl From<u16> for FieldValue {
112 fn from(v: u16) -> Self {
113 FieldValue::UInteger(v as u64)
114 }
115}
116impl From<u8> for FieldValue {
117 fn from(v: u8) -> Self {
118 FieldValue::UInteger(v as u64)
119 }
120}
121impl From<bool> for FieldValue {
122 fn from(v: bool) -> Self {
123 FieldValue::Boolean(v)
124 }
125}
126impl From<String> for FieldValue {
127 fn from(v: String) -> Self {
128 FieldValue::String(v)
129 }
130}
131impl From<&str> for FieldValue {
132 fn from(v: &str) -> Self {
133 FieldValue::String(v.to_owned())
134 }
135}
136
137#[derive(Debug, Clone, Default)]
144pub struct Point {
145 pub(crate) measurement: String,
146 pub(crate) tags: IndexMap<String, String>,
147 pub(crate) fields: IndexMap<String, FieldValue>,
148 pub(crate) timestamp: Option<i64>,
149}
150
151impl Point {
152 pub fn new(measurement: impl Into<String>) -> Self {
154 Point {
155 measurement: measurement.into(),
156 ..Default::default()
157 }
158 }
159
160 pub fn with_capacity(measurement: impl Into<String>, n_fields: usize) -> Self {
163 Point {
164 measurement: measurement.into(),
165 tags: IndexMap::new(),
166 fields: IndexMap::with_capacity(n_fields),
167 timestamp: None,
168 }
169 }
170
171 pub fn tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
173 self.tags.insert(key.into(), value.into());
174 self
175 }
176
177 pub fn field(mut self, key: impl Into<String>, value: impl Into<FieldValue>) -> Self {
179 self.fields.insert(key.into(), value.into());
180 self
181 }
182
183 pub fn timestamp_nanos(mut self, nanos: i64) -> Self {
185 self.timestamp = Some(nanos);
186 self
187 }
188
189 pub fn timestamp_datetime(mut self, dt: DateTime<Utc>) -> Self {
191 self.timestamp = dt.timestamp_nanos_opt();
192 self
193 }
194
195 pub fn measurement(&self) -> &str {
196 &self.measurement
197 }
198 pub fn tags(&self) -> &IndexMap<String, String> {
199 &self.tags
200 }
201 pub fn fields(&self) -> &IndexMap<String, FieldValue> {
202 &self.fields
203 }
204 pub fn timestamp(&self) -> Option<i64> {
205 self.timestamp
206 }
207
208 pub fn to_line_protocol(&self, precision: Precision) -> Result<String, Error> {
212 let mut buf = Vec::with_capacity(64);
213 let mut key_scratch = Vec::new();
214 self.write_line_protocol(&mut buf, precision, &HashMap::new(), None, &mut key_scratch)?;
215 Ok(String::from_utf8(buf).expect("line protocol is valid UTF-8"))
216 }
217
218 pub(crate) fn write_line_protocol<'p>(
225 &'p self,
226 buf: &mut Vec<u8>,
227 precision: Precision,
228 default_tags: &HashMap<String, String>,
229 tag_order: Option<&[String]>,
230 key_scratch: &mut Vec<&'p str>,
231 ) -> Result<(), Error> {
232 if self.fields.is_empty() {
233 return Err(Error::Config(format!(
234 "point '{}' has no fields; at least one field is required",
235 self.measurement
236 )));
237 }
238
239 write_escaped_measurement(buf, &self.measurement);
241
242 if default_tags.is_empty() && tag_order.is_none() {
246 if !self.tags.is_empty() {
247 key_scratch.clear();
248 key_scratch.extend(self.tags.keys().map(String::as_str));
249 key_scratch.sort_unstable();
250 for &k in key_scratch.iter() {
251 buf.push(b',');
252 write_escaped_tag_key(buf, k);
253 buf.push(b'=');
254 write_escaped_tag_value(buf, &self.tags[k]);
255 }
256 }
257 } else {
258 let mut tag_map: HashMap<&str, &str> =
260 HashMap::with_capacity(default_tags.len() + self.tags.len());
261 for (k, v) in default_tags {
262 tag_map.insert(k.as_str(), v.as_str());
263 }
264 for (k, v) in &self.tags {
265 tag_map.insert(k.as_str(), v.as_str());
266 }
267
268 if !tag_map.is_empty() {
269 let ordered_keys: Vec<&str> = if let Some(order) = tag_order {
270 let mut ordered: Vec<&str> = order
271 .iter()
272 .filter(|k| tag_map.contains_key(k.as_str()))
273 .map(|k| k.as_str())
274 .collect();
275 let mut rest: Vec<&str> = tag_map
276 .keys()
277 .copied()
278 .filter(|k| !order.iter().any(|o| o.as_str() == *k))
279 .collect();
280 rest.sort_unstable();
281 ordered.extend(rest);
282 ordered
283 } else {
284 let mut keys: Vec<&str> = tag_map.keys().copied().collect();
285 keys.sort_unstable();
286 keys
287 };
288
289 for k in ordered_keys {
290 buf.push(b',');
291 write_escaped_tag_key(buf, k);
292 buf.push(b'=');
293 write_escaped_tag_value(buf, tag_map[k]);
294 }
295 }
296 }
297
298 buf.push(b' ');
300 let mut first = true;
301 for (k, v) in &self.fields {
302 if !first {
303 buf.push(b',');
304 }
305 first = false;
306 write_escaped_tag_key(buf, k); buf.push(b'=');
308 v.write_lp(buf);
309 }
310
311 if let Some(ts) = self.timestamp {
313 buf.push(b' ');
314 let mut itoa_buf = itoa::Buffer::new();
315 buf.extend_from_slice(itoa_buf.format(precision.scale_timestamp(ts)).as_bytes());
316 }
317
318 Ok(())
319 }
320}
321
322fn escape_with(input: &str, needs_escape: fn(u8) -> bool) -> Cow<'_, str> {
327 if !input.bytes().any(needs_escape) {
328 return Cow::Borrowed(input);
329 }
330 let mut out = String::with_capacity(input.len() + 8);
331 for ch in input.chars() {
332 if ch.is_ascii() && needs_escape(ch as u8) {
333 out.push('\\');
334 }
335 out.push(ch);
336 }
337 Cow::Owned(out)
338}
339
340fn measurement_needs_escape(b: u8) -> bool {
341 matches!(b, b',' | b' ')
342}
343
344fn tag_needs_escape(b: u8) -> bool {
345 matches!(b, b',' | b'=' | b' ')
346}
347
348pub(crate) fn escape_measurement(s: &str) -> Cow<'_, str> {
351 escape_with(s, measurement_needs_escape)
352}
353
354pub(crate) fn escape_tag(s: &str) -> Cow<'_, str> {
356 escape_with(s, tag_needs_escape)
357}
358
359pub(crate) fn escape_string_field(s: &str) -> Cow<'_, str> {
362 if !s.bytes().any(|b| b == b'\\' || b == b'"') {
363 return Cow::Borrowed(s);
364 }
365 let mut out = String::with_capacity(s.len() + 8);
366 for ch in s.chars() {
367 if ch == '\\' || ch == '"' {
368 out.push('\\');
369 }
370 out.push(ch);
371 }
372 Cow::Owned(out)
373}
374
375fn write_escaped_measurement(buf: &mut Vec<u8>, s: &str) {
376 buf.extend_from_slice(escape_measurement(s).as_bytes());
377}
378
379fn write_escaped_tag_key(buf: &mut Vec<u8>, s: &str) {
380 buf.extend_from_slice(escape_tag(s).as_bytes());
381}
382
383fn write_escaped_tag_value(buf: &mut Vec<u8>, s: &str) {
384 write_escaped_tag_key(buf, s); }
386
387fn write_escaped_string_field(buf: &mut Vec<u8>, s: &str) {
388 buf.extend_from_slice(escape_string_field(s).as_bytes());
389}