questdb/ingress/buffer.rs
1/*******************************************************************************
2 * ___ _ ____ ____
3 * / _ \ _ _ ___ ___| |_| _ \| __ )
4 * | | | | | | |/ _ \/ __| __| | | | _ \
5 * | |_| | |_| | __/\__ \ |_| |_| | |_) |
6 * \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 * Copyright (c) 2014-2019 Appsicle
9 * Copyright (c) 2019-2025 QuestDB
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 *
23 ******************************************************************************/
24use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed};
25use crate::ingress::{
26 ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampNanos,
27 ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS, MAX_NAME_LEN_DEFAULT,
28};
29use crate::{error, Error};
30use std::fmt::{Debug, Formatter};
31use std::num::NonZeroUsize;
32use std::slice::from_raw_parts_mut;
33
34fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut Vec<u8>, s: &str)
35where
36 C: Fn(u8) -> bool,
37 Q: Fn(&mut Vec<u8>),
38{
39 let mut to_escape = 0usize;
40 for b in s.bytes() {
41 if check_escape_fn(b) {
42 to_escape += 1;
43 }
44 }
45
46 quoting_fn(output);
47
48 if to_escape == 0 {
49 // output.push_str(s);
50 output.extend_from_slice(s.as_bytes());
51 } else {
52 let additional = s.len() + to_escape;
53 output.reserve(additional);
54 let mut index = output.len();
55 unsafe { output.set_len(index + additional) };
56 for b in s.bytes() {
57 if check_escape_fn(b) {
58 unsafe {
59 *output.get_unchecked_mut(index) = b'\\';
60 }
61 index += 1;
62 }
63
64 unsafe {
65 *output.get_unchecked_mut(index) = b;
66 }
67 index += 1;
68 }
69 }
70
71 quoting_fn(output);
72}
73
74fn must_escape_unquoted(c: u8) -> bool {
75 matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
76}
77
78fn must_escape_quoted(c: u8) -> bool {
79 matches!(c, b'\n' | b'\r' | b'"' | b'\\')
80}
81
82fn write_escaped_unquoted(output: &mut Vec<u8>, s: &str) {
83 write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
84}
85
86fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
87 write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
88}
89
90pub(crate) struct F64Serializer {
91 buf: ryu::Buffer,
92 n: f64,
93}
94
95impl F64Serializer {
96 pub(crate) fn new(n: f64) -> Self {
97 F64Serializer {
98 buf: ryu::Buffer::new(),
99 n,
100 }
101 }
102
103 // This function was taken and customized from the ryu crate.
104 #[cold]
105 fn format_nonfinite(&self) -> &'static str {
106 const MANTISSA_MASK: u64 = 0x000fffffffffffff;
107 const SIGN_MASK: u64 = 0x8000000000000000;
108 let bits = self.n.to_bits();
109 if bits & MANTISSA_MASK != 0 {
110 "NaN"
111 } else if bits & SIGN_MASK != 0 {
112 "-Infinity"
113 } else {
114 "Infinity"
115 }
116 }
117
118 pub(crate) fn as_str(&mut self) -> &str {
119 if self.n.is_finite() {
120 self.buf.format_finite(self.n)
121 } else {
122 self.format_nonfinite()
123 }
124 }
125}
126
127#[derive(Debug, Copy, Clone)]
128enum Op {
129 Table = 1,
130 Symbol = 1 << 1,
131 Column = 1 << 2,
132 At = 1 << 3,
133 Flush = 1 << 4,
134}
135
136impl Op {
137 fn descr(self) -> &'static str {
138 match self {
139 Op::Table => "table",
140 Op::Symbol => "symbol",
141 Op::Column => "column",
142 Op::At => "at",
143 Op::Flush => "flush",
144 }
145 }
146}
147
148#[derive(Debug, Copy, Clone, PartialEq)]
149enum OpCase {
150 Init = Op::Table as isize,
151 TableWritten = Op::Symbol as isize | Op::Column as isize,
152 SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
153 ColumnWritten = Op::Column as isize | Op::At as isize,
154 MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
155}
156
157impl OpCase {
158 fn next_op_descr(self) -> &'static str {
159 match self {
160 OpCase::Init => "should have called `table` instead",
161 OpCase::TableWritten => "should have called `symbol` or `column` instead",
162 OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
163 OpCase::ColumnWritten => "should have called `column` or `at` instead",
164 OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
165 }
166 }
167}
168
169// IMPORTANT: This struct MUST remain `Copy` to ensure that
170// there are no heap allocations when performing marker operations.
171#[derive(Debug, Clone, Copy)]
172struct BufferState {
173 op_case: OpCase,
174 row_count: usize,
175 first_table_len: Option<NonZeroUsize>,
176 transactional: bool,
177}
178
179impl BufferState {
180 fn new() -> Self {
181 Self {
182 op_case: OpCase::Init,
183 row_count: 0,
184 first_table_len: None,
185 transactional: true,
186 }
187 }
188}
189
190/// A validated table name.
191///
192/// This type simply wraps a `&str`.
193///
194/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method,
195/// it doesn't have to validate it again. This saves CPU cycles.
196#[derive(Clone, Copy)]
197pub struct TableName<'a> {
198 name: &'a str,
199}
200
201impl<'a> TableName<'a> {
202 /// Construct a validated table name.
203 pub fn new(name: &'a str) -> crate::Result<Self> {
204 if name.is_empty() {
205 return Err(error::fmt!(
206 InvalidName,
207 "Table names must have a non-zero length."
208 ));
209 }
210
211 let mut prev = '\0';
212 for (index, c) in name.chars().enumerate() {
213 match c {
214 '.' => {
215 if index == 0 || index == name.len() - 1 || prev == '.' {
216 return Err(error::fmt!(
217 InvalidName,
218 concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
219 name,
220 index
221 ));
222 }
223 }
224 '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
225 | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
226 | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
227 | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
228 return Err(error::fmt!(
229 InvalidName,
230 concat!(
231 "Bad string {:?}: ",
232 "Table names can't contain ",
233 "a {:?} character, which was found at ",
234 "byte position {}."
235 ),
236 name,
237 c,
238 index
239 ));
240 }
241 '\u{feff}' => {
242 // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
243 // aka UTF-8 BOM if it appears anywhere in the string.
244 return Err(error::fmt!(
245 InvalidName,
246 concat!(
247 "Bad string {:?}: ",
248 "Table names can't contain ",
249 "a UTF-8 BOM character, which was found at ",
250 "byte position {}."
251 ),
252 name,
253 index
254 ));
255 }
256 _ => (),
257 }
258 prev = c;
259 }
260
261 Ok(Self { name })
262 }
263
264 /// Construct a table name without validating it.
265 ///
266 /// This breaks API encapsulation and is only intended for use
267 /// when the string was already previously validated.
268 ///
269 /// The QuestDB server will reject an invalid table name.
270 pub fn new_unchecked(name: &'a str) -> Self {
271 Self { name }
272 }
273}
274
275impl<'a> TryFrom<&'a str> for TableName<'a> {
276 type Error = Error;
277
278 fn try_from(name: &'a str) -> crate::Result<Self> {
279 Self::new(name)
280 }
281}
282
283impl AsRef<str> for TableName<'_> {
284 fn as_ref(&self) -> &str {
285 self.name
286 }
287}
288
289/// A validated column name.
290///
291/// This type simply wraps a `&str`.
292///
293/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method,
294/// it doesn't have to validate it again. This saves CPU cycles.
295#[derive(Clone, Copy)]
296pub struct ColumnName<'a> {
297 name: &'a str,
298}
299
300impl<'a> ColumnName<'a> {
301 /// Construct a validated table name.
302 pub fn new(name: &'a str) -> crate::Result<Self> {
303 if name.is_empty() {
304 return Err(error::fmt!(
305 InvalidName,
306 "Column names must have a non-zero length."
307 ));
308 }
309
310 for (index, c) in name.chars().enumerate() {
311 match c {
312 '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
313 | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
314 | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
315 | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
316 return Err(error::fmt!(
317 InvalidName,
318 concat!(
319 "Bad string {:?}: ",
320 "Column names can't contain ",
321 "a {:?} character, which was found at ",
322 "byte position {}."
323 ),
324 name,
325 c,
326 index
327 ));
328 }
329 '\u{FEFF}' => {
330 // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
331 // aka UTF-8 BOM if it appears anywhere in the string.
332 return Err(error::fmt!(
333 InvalidName,
334 concat!(
335 "Bad string {:?}: ",
336 "Column names can't contain ",
337 "a UTF-8 BOM character, which was found at ",
338 "byte position {}."
339 ),
340 name,
341 index
342 ));
343 }
344 _ => (),
345 }
346 }
347
348 Ok(Self { name })
349 }
350
351 /// Construct a column name without validating it.
352 ///
353 /// This breaks API encapsulation and is only intended for use
354 /// when the string was already previously validated.
355 ///
356 /// The QuestDB server will reject an invalid column name.
357 pub fn new_unchecked(name: &'a str) -> Self {
358 Self { name }
359 }
360}
361
362impl<'a> TryFrom<&'a str> for ColumnName<'a> {
363 type Error = Error;
364
365 fn try_from(name: &'a str) -> crate::Result<Self> {
366 Self::new(name)
367 }
368}
369
370impl AsRef<str> for ColumnName<'_> {
371 fn as_ref(&self) -> &str {
372 self.name
373 }
374}
375
376/// A reusable buffer to prepare a batch of ILP messages.
377///
378/// # Example
379///
380/// ```no_run
381/// # use questdb::Result;
382/// # use questdb::ingress::SenderBuilder;
383///
384/// # fn main() -> Result<()> {
385/// # let mut sender = SenderBuilder::from_conf("http::addr=localhost:9000;")?.build()?;
386/// # use questdb::Result;
387/// use questdb::ingress::{Buffer, TimestampMicros, TimestampNanos};
388/// let mut buffer = sender.new_buffer();
389///
390/// // first row
391/// buffer
392/// .table("table1")?
393/// .symbol("bar", "baz")?
394/// .column_bool("a", false)?
395/// .column_i64("b", 42)?
396/// .column_f64("c", 3.14)?
397/// .column_str("d", "hello")?
398/// .column_ts("e", TimestampMicros::now())?
399/// .at(TimestampNanos::now())?;
400///
401/// // second row
402/// buffer
403/// .table("table2")?
404/// .symbol("foo", "bar")?
405/// .at(TimestampNanos::now())?;
406/// # Ok(())
407/// # }
408/// ```
409///
410/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush).
411///
412/// # Sequential Coupling
413/// The Buffer API is sequentially coupled:
414/// * A row always starts with [`table`](Buffer::table).
415/// * A row must contain at least one [`symbol`](Buffer::symbol) or
416/// column (
417/// [`column_bool`](Buffer::column_bool),
418/// [`column_i64`](Buffer::column_i64),
419/// [`column_f64`](Buffer::column_f64),
420/// [`column_str`](Buffer::column_str),
421/// [`column_arr`](Buffer::column_arr),
422/// [`column_ts`](Buffer::column_ts)).
423/// * Symbols must appear before columns.
424/// * A row must be terminated with either [`at`](Buffer::at) or
425/// [`at_now`](Buffer::at_now).
426///
427/// This diagram visualizes the sequence:
428///
429/// <img src="https://raw.githubusercontent.com/questdb/c-questdb-client/main/api_seq/seq.svg">
430///
431/// # Buffer method calls, Serialized ILP types and QuestDB types
432///
433/// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) |
434/// |---------------|--------------------------------------------------------------|
435/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) |
436/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) |
437/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) |
438/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) |
439/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) |
440/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.io/docs/reference/api/ilp/columnset-types#array) |
441/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) |
442///
443/// QuestDB supports both `STRING` and `SYMBOL` column types.
444///
445/// To understand the difference, refer to the
446/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell,
447/// symbols are interned strings, most suitable for identifiers that are repeated many
448/// times throughout the column. They offer an advantage in storage space and query
449/// performance.
450///
451/// # Inserting NULL values
452///
453/// To insert a NULL value, skip the symbol or column for that row.
454///
455/// # Recovering from validation errors
456///
457/// If you want to recover from potential validation errors, call
458/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state,
459/// append as many rows or parts of rows as you like, and then call
460/// [`buffer.clear_marker()`](Buffer::clear_marker) on success.
461///
462/// If there was an error in one of the rows, use
463/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the
464/// marked last known good state.
465///
466#[derive(Clone)]
467pub struct Buffer {
468 output: Vec<u8>,
469 state: BufferState,
470 marker: Option<(usize, BufferState)>,
471 max_name_len: usize,
472 protocol_version: ProtocolVersion,
473}
474
475impl Buffer {
476 /// Creates a new [`Buffer`] with default parameters.
477 ///
478 /// - Uses the specified protocol version
479 /// - Sets maximum name length to **127 characters** (QuestDB server default)
480 ///
481 /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
482 /// and [`Sender::max_name_len`] is 127.
483 ///
484 /// For custom name lengths, use [`Self::with_max_name_len`]
485 pub fn new(protocol_version: ProtocolVersion) -> Self {
486 Self::with_max_name_len(protocol_version, MAX_NAME_LEN_DEFAULT)
487 }
488
489 /// Creates a new [`Buffer`] with a custom maximum name length.
490 ///
491 /// - `max_name_len`: Maximum allowed length for table/column names, match
492 /// your QuestDB server's `cairo.max.file.name.length` configuration
493 /// - `protocol_version`: Protocol version to use
494 ///
495 /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
496 /// and [`Sender::max_name_len`].
497 ///
498 /// For the default max name length limit (127), use [`Self::new`].
499 pub fn with_max_name_len(protocol_version: ProtocolVersion, max_name_len: usize) -> Self {
500 Self {
501 output: Vec::new(),
502 state: BufferState::new(),
503 marker: None,
504 max_name_len,
505 protocol_version,
506 }
507 }
508
509 pub fn protocol_version(&self) -> ProtocolVersion {
510 self.protocol_version
511 }
512
513 /// Pre-allocate to ensure the buffer has enough capacity for at least the
514 /// specified additional byte count. This may be rounded up.
515 /// This does not allocate if such additional capacity is already satisfied.
516 /// See: `capacity`.
517 pub fn reserve(&mut self, additional: usize) {
518 self.output.reserve(additional);
519 }
520
521 /// The number of bytes accumulated in the buffer.
522 pub fn len(&self) -> usize {
523 self.output.len()
524 }
525
526 /// The number of rows accumulated in the buffer.
527 pub fn row_count(&self) -> usize {
528 self.state.row_count
529 }
530
531 /// Tells whether the buffer is transactional. It is transactional iff it contains
532 /// data for at most one table. Additionally, you must send the buffer over HTTP to
533 /// get transactional behavior.
534 pub fn transactional(&self) -> bool {
535 self.state.transactional
536 }
537
538 pub fn is_empty(&self) -> bool {
539 self.output.is_empty()
540 }
541
542 /// The total number of bytes the buffer can hold before it needs to resize.
543 pub fn capacity(&self) -> usize {
544 self.output.capacity()
545 }
546
547 pub fn as_bytes(&self) -> &[u8] {
548 &self.output
549 }
550
551 /// Mark a rewind point.
552 /// This allows undoing accumulated changes to the buffer for one or more
553 /// rows by calling [`rewind_to_marker`](Buffer::rewind_to_marker).
554 /// Any previous marker will be discarded.
555 /// Once the marker is no longer needed, call
556 /// [`clear_marker`](Buffer::clear_marker).
557 pub fn set_marker(&mut self) -> crate::Result<()> {
558 if (self.state.op_case as isize & Op::Table as isize) == 0 {
559 return Err(error::fmt!(
560 InvalidApiCall,
561 concat!(
562 "Can't set the marker whilst constructing a line. ",
563 "A marker may only be set on an empty buffer or after ",
564 "`at` or `at_now` is called."
565 )
566 ));
567 }
568 self.marker = Some((self.output.len(), self.state));
569 Ok(())
570 }
571
572 /// Undo all changes since the last [`set_marker`](Buffer::set_marker)
573 /// call.
574 ///
575 /// As a side effect, this also clears the marker.
576 pub fn rewind_to_marker(&mut self) -> crate::Result<()> {
577 if let Some((position, state)) = self.marker.take() {
578 self.output.truncate(position);
579 self.state = state;
580 Ok(())
581 } else {
582 Err(error::fmt!(
583 InvalidApiCall,
584 "Can't rewind to the marker: No marker set."
585 ))
586 }
587 }
588
589 /// Discard any marker as may have been set by
590 /// [`set_marker`](Buffer::set_marker).
591 ///
592 /// Idempotent.
593 pub fn clear_marker(&mut self) {
594 self.marker = None;
595 }
596
597 /// Reset the buffer and clear contents whilst retaining
598 /// [`capacity`](Buffer::capacity).
599 pub fn clear(&mut self) {
600 self.output.clear();
601 self.state = BufferState::new();
602 self.marker = None;
603 }
604
605 /// Check if the next API operation is allowed as per the OP case state machine.
606 #[inline(always)]
607 fn check_op(&self, op: Op) -> crate::Result<()> {
608 if (self.state.op_case as isize & op as isize) > 0 {
609 Ok(())
610 } else {
611 Err(error::fmt!(
612 InvalidApiCall,
613 "State error: Bad call to `{}`, {}.",
614 op.descr(),
615 self.state.op_case.next_op_descr()
616 ))
617 }
618 }
619
620 /// Checks if this buffer is ready to be flushed to a sender via one of the
621 /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer
622 /// is ready to be flushed via a [`Sender`] while an [`Err`] will contain a
623 /// message indicating why this [`Buffer`] cannot be flushed at the moment.
624 #[inline(always)]
625 pub fn check_can_flush(&self) -> crate::Result<()> {
626 self.check_op(Op::Flush)
627 }
628
629 #[inline(always)]
630 fn validate_max_name_len(&self, name: &str) -> crate::Result<()> {
631 if name.len() > self.max_name_len {
632 return Err(error::fmt!(
633 InvalidName,
634 "Bad name: {:?}: Too long (max {} characters)",
635 name,
636 self.max_name_len
637 ));
638 }
639 Ok(())
640 }
641
642 /// Begin recording a new row for the given table.
643 ///
644 /// ```no_run
645 /// # use questdb::Result;
646 /// # use questdb::ingress::{Buffer, SenderBuilder};
647 /// # fn main() -> Result<()> {
648 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
649 /// # let mut buffer = sender.new_buffer();
650 /// buffer.table("table_name")?;
651 /// # Ok(())
652 /// # }
653 /// ```
654 ///
655 /// or
656 ///
657 /// ```no_run
658 /// # use questdb::Result;
659 /// # use questdb::ingress::{Buffer, SenderBuilder};
660 /// use questdb::ingress::TableName;
661 ///
662 /// # fn main() -> Result<()> {
663 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
664 /// # let mut buffer = sender.new_buffer();
665 /// let table_name = TableName::new("table_name")?;
666 /// buffer.table(table_name)?;
667 /// # Ok(())
668 /// # }
669 /// ```
670 pub fn table<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
671 where
672 N: TryInto<TableName<'a>>,
673 Error: From<N::Error>,
674 {
675 let name: TableName<'a> = name.try_into()?;
676 self.validate_max_name_len(name.name)?;
677 self.check_op(Op::Table)?;
678 let table_begin = self.output.len();
679 write_escaped_unquoted(&mut self.output, name.name);
680 let table_end = self.output.len();
681 self.state.op_case = OpCase::TableWritten;
682
683 // A buffer stops being transactional if it targets multiple tables.
684 if let Some(first_table_len) = &self.state.first_table_len {
685 let first_table = &self.output[0..first_table_len.get()];
686 let this_table = &self.output[table_begin..table_end];
687 if first_table != this_table {
688 self.state.transactional = false;
689 }
690 } else {
691 debug_assert!(table_begin == 0);
692
693 // This is a bit confusing, so worth explaining:
694 // `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
695 // but we know that `table_end` is never 0 here, we just need an option type
696 // anyway, so we don't bother unwrapping it to then wrap it again.
697 let first_table_len = NonZeroUsize::new(table_end);
698
699 // Instead we just assert that it's `Some`.
700 debug_assert!(first_table_len.is_some());
701
702 self.state.first_table_len = first_table_len;
703 }
704 Ok(self)
705 }
706
707 /// Record a symbol for the given column.
708 /// Make sure you record all symbol columns before any other column type.
709 ///
710 /// ```no_run
711 /// # use questdb::Result;
712 /// # use questdb::ingress::{Buffer, SenderBuilder};
713 /// # fn main() -> Result<()> {
714 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
715 /// # let mut buffer = sender.new_buffer();
716 /// # buffer.table("x")?;
717 /// buffer.symbol("col_name", "value")?;
718 /// # Ok(())
719 /// # }
720 /// ```
721 ///
722 /// or
723 ///
724 /// ```no_run
725 /// # use questdb::Result;
726 /// # use questdb::ingress::{Buffer, SenderBuilder};
727 /// # fn main() -> Result<()> {
728 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
729 /// # let mut buffer = sender.new_buffer();
730 /// # buffer.table("x")?;
731 /// let value: String = "value".to_owned();
732 /// buffer.symbol("col_name", value)?;
733 /// # Ok(())
734 /// # }
735 /// ```
736 ///
737 /// or
738 ///
739 /// ```no_run
740 /// # use questdb::Result;
741 /// # use questdb::ingress::{Buffer, SenderBuilder};
742 /// use questdb::ingress::ColumnName;
743 ///
744 /// # fn main() -> Result<()> {
745 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
746 /// # let mut buffer = sender.new_buffer();
747 /// # buffer.table("x")?;
748 /// let col_name = ColumnName::new("col_name")?;
749 /// buffer.symbol(col_name, "value")?;
750 /// # Ok(())
751 /// # }
752 /// ```
753 ///
754 pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
755 where
756 N: TryInto<ColumnName<'a>>,
757 S: AsRef<str>,
758 Error: From<N::Error>,
759 {
760 let name: ColumnName<'a> = name.try_into()?;
761 self.validate_max_name_len(name.name)?;
762 self.check_op(Op::Symbol)?;
763 self.output.push(b',');
764 write_escaped_unquoted(&mut self.output, name.name);
765 self.output.push(b'=');
766 write_escaped_unquoted(&mut self.output, value.as_ref());
767 self.state.op_case = OpCase::SymbolWritten;
768 Ok(self)
769 }
770
771 fn write_column_key<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
772 where
773 N: TryInto<ColumnName<'a>>,
774 Error: From<N::Error>,
775 {
776 let name: ColumnName<'a> = name.try_into()?;
777 self.validate_max_name_len(name.name)?;
778 self.check_op(Op::Column)?;
779 self.output
780 .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
781 b' '
782 } else {
783 b','
784 });
785 write_escaped_unquoted(&mut self.output, name.name);
786 self.output.push(b'=');
787 self.state.op_case = OpCase::ColumnWritten;
788 Ok(self)
789 }
790
791 /// Record a boolean value for the given column.
792 ///
793 /// ```no_run
794 /// # use questdb::Result;
795 /// # use questdb::ingress::{Buffer, SenderBuilder};
796 /// # fn main() -> Result<()> {
797 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
798 /// # let mut buffer = sender.new_buffer();
799 /// # buffer.table("x")?;
800 /// buffer.column_bool("col_name", true)?;
801 /// # Ok(())
802 /// # }
803 /// ```
804 ///
805 /// or
806 ///
807 /// ```no_run
808 /// # use questdb::Result;
809 /// # use questdb::ingress::{Buffer, SenderBuilder};
810 /// use questdb::ingress::ColumnName;
811 ///
812 /// # fn main() -> Result<()> {
813 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
814 /// # let mut buffer = sender.new_buffer();
815 /// # buffer.table("x")?;
816 /// let col_name = ColumnName::new("col_name")?;
817 /// buffer.column_bool(col_name, true)?;
818 /// # Ok(())
819 /// # }
820 /// ```
821 pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> crate::Result<&mut Self>
822 where
823 N: TryInto<ColumnName<'a>>,
824 Error: From<N::Error>,
825 {
826 self.write_column_key(name)?;
827 self.output.push(if value { b't' } else { b'f' });
828 Ok(self)
829 }
830
831 /// Record an integer value for the given column.
832 ///
833 /// ```no_run
834 /// # use questdb::Result;
835 /// # use questdb::ingress::{Buffer, SenderBuilder};
836 /// # fn main() -> Result<()> {
837 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
838 /// # let mut buffer = sender.new_buffer();
839 /// # buffer.table("x")?;
840 /// buffer.column_i64("col_name", 42)?;
841 /// # Ok(())
842 /// # }
843 /// ```
844 ///
845 /// or
846 ///
847 /// ```no_run
848 /// # use questdb::Result;
849 /// # use questdb::ingress::{Buffer, SenderBuilder};
850 /// use questdb::ingress::ColumnName;
851 ///
852 /// # fn main() -> Result<()> {
853 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
854 /// # let mut buffer = sender.new_buffer();
855 /// # buffer.table("x")?;
856 /// let col_name = ColumnName::new("col_name")?;
857 /// buffer.column_i64(col_name, 42)?;
858 /// # Ok(())
859 /// # }
860 /// ```
861 pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> crate::Result<&mut Self>
862 where
863 N: TryInto<ColumnName<'a>>,
864 Error: From<N::Error>,
865 {
866 self.write_column_key(name)?;
867 let mut buf = itoa::Buffer::new();
868 let printed = buf.format(value);
869 self.output.extend_from_slice(printed.as_bytes());
870 self.output.push(b'i');
871 Ok(self)
872 }
873
874 /// Record a floating point value for the given column.
875 ///
876 /// ```no_run
877 /// # use questdb::Result;
878 /// # use questdb::ingress::{Buffer, SenderBuilder};
879 /// # fn main() -> Result<()> {
880 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
881 /// # let mut buffer = sender.new_buffer();
882 /// # buffer.table("x")?;
883 /// buffer.column_f64("col_name", 3.14)?;
884 /// # Ok(())
885 /// # }
886 /// ```
887 ///
888 /// or
889 ///
890 /// ```no_run
891 /// # use questdb::Result;
892 /// # use questdb::ingress::{Buffer, SenderBuilder};
893 /// use questdb::ingress::ColumnName;
894 ///
895 /// # fn main() -> Result<()> {
896 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
897 /// # let mut buffer = sender.new_buffer();
898 /// # buffer.table("x")?;
899 /// let col_name = ColumnName::new("col_name")?;
900 /// buffer.column_f64(col_name, 3.14)?;
901 /// # Ok(())
902 /// # }
903 /// ```
904 pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> crate::Result<&mut Self>
905 where
906 N: TryInto<ColumnName<'a>>,
907 Error: From<N::Error>,
908 {
909 self.write_column_key(name)?;
910 if !matches!(self.protocol_version, ProtocolVersion::V1) {
911 self.output.push(b'=');
912 self.output.push(DOUBLE_BINARY_FORMAT_TYPE);
913 self.output.extend_from_slice(&value.to_le_bytes())
914 } else {
915 let mut ser = F64Serializer::new(value);
916 self.output.extend_from_slice(ser.as_str().as_bytes())
917 }
918 Ok(self)
919 }
920
921 /// Record a string value for the given column.
922 ///
923 /// ```no_run
924 /// # use questdb::Result;
925 /// # use questdb::ingress::{Buffer, SenderBuilder};
926 /// # fn main() -> Result<()> {
927 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
928 /// # let mut buffer = sender.new_buffer();
929 /// # buffer.table("x")?;
930 /// buffer.column_str("col_name", "value")?;
931 /// # Ok(())
932 /// # }
933 /// ```
934 ///
935 /// or
936 ///
937 /// ```no_run
938 /// # use questdb::Result;
939 /// # use questdb::ingress::{Buffer, SenderBuilder};
940 /// # fn main() -> Result<()> {
941 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
942 /// # let mut buffer = sender.new_buffer();
943 /// # buffer.table("x")?;
944 /// let value: String = "value".to_owned();
945 /// buffer.column_str("col_name", value)?;
946 /// # Ok(())
947 /// # }
948 /// ```
949 ///
950 /// or
951 ///
952 /// ```no_run
953 /// # use questdb::Result;
954 /// # use questdb::ingress::{Buffer, SenderBuilder};
955 /// use questdb::ingress::ColumnName;
956 ///
957 /// # fn main() -> Result<()> {
958 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
959 /// # let mut buffer = sender.new_buffer();
960 /// # buffer.table("x")?;
961 /// let col_name = ColumnName::new("col_name")?;
962 /// buffer.column_str(col_name, "value")?;
963 /// # Ok(())
964 /// # }
965 /// ```
966 pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
967 where
968 N: TryInto<ColumnName<'a>>,
969 S: AsRef<str>,
970 Error: From<N::Error>,
971 {
972 self.write_column_key(name)?;
973 write_escaped_quoted(&mut self.output, value.as_ref());
974 Ok(self)
975 }
976
977 /// Record a multidimensional array value for the given column.
978 ///
979 /// Supports arrays with up to [`MAX_ARRAY_DIMS`] dimensions. The array elements must
980 /// be of type `f64`, which is currently the only supported data type.
981 ///
982 /// **Note**: QuestDB server version 9.0.0 or later is required for array support.
983 ///
984 /// # Examples
985 ///
986 /// Recording a 2D array using slices:
987 ///
988 /// ```no_run
989 /// # use questdb::Result;
990 /// # use questdb::ingress::{Buffer, SenderBuilder};
991 /// # fn main() -> Result<()> {
992 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
993 /// # let mut buffer = sender.new_buffer();
994 /// # buffer.table("x")?;
995 /// let array_2d = vec![vec![1.1, 2.2], vec![3.3, 4.4]];
996 /// buffer.column_arr("array_col", &array_2d)?;
997 /// # Ok(())
998 /// # }
999 /// ```
1000 ///
1001 /// Recording a 3D array using vectors:
1002 ///
1003 /// ```no_run
1004 /// # use questdb::Result;
1005 /// # use questdb::ingress::{Buffer, ColumnName, SenderBuilder};
1006 /// # fn main() -> Result<()> {
1007 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1008 /// # let mut buffer = sender.new_buffer();
1009 /// # buffer.table("x1")?;
1010 /// let array_3d = vec![vec![vec![42.0; 4]; 3]; 2];
1011 /// let col_name = ColumnName::new("col1")?;
1012 /// buffer.column_arr(col_name, &array_3d)?;
1013 /// # Ok(())
1014 /// # }
1015 /// ```
1016 ///
1017 /// # Errors
1018 ///
1019 /// Returns [`Error`] if:
1020 /// - Array dimensions exceed [`MAX_ARRAY_DIMS`]
1021 /// - Failed to get dimension sizes
1022 /// - Column name validation fails
1023 /// - Protocol version v1 is used (arrays require v2+)
1024 #[allow(private_bounds)]
1025 pub fn column_arr<'a, N, T, D>(&mut self, name: N, view: &T) -> crate::Result<&mut Self>
1026 where
1027 N: TryInto<ColumnName<'a>>,
1028 T: NdArrayView<D>,
1029 D: ArrayElement + ArrayElementSealed,
1030 Error: From<N::Error>,
1031 {
1032 if self.protocol_version == ProtocolVersion::V1 {
1033 return Err(error::fmt!(
1034 ProtocolVersionError,
1035 "Protocol version v1 does not support array datatype",
1036 ));
1037 }
1038 let ndim = view.ndim();
1039 if ndim == 0 {
1040 return Err(error::fmt!(
1041 ArrayError,
1042 "Zero-dimensional arrays are not supported",
1043 ));
1044 }
1045
1046 // check dimension less equal than max dims
1047 if MAX_ARRAY_DIMS < ndim {
1048 return Err(error::fmt!(
1049 ArrayError,
1050 "Array dimension mismatch: expected at most {} dimensions, but got {}",
1051 MAX_ARRAY_DIMS,
1052 ndim
1053 ));
1054 }
1055
1056 let array_buf_size = check_and_get_array_bytes_size(view)?;
1057 self.write_column_key(name)?;
1058 // binary format flag '='
1059 self.output.push(b'=');
1060 // binary format entity type
1061 self.output.push(ARRAY_BINARY_FORMAT_TYPE);
1062 // ndarr datatype
1063 self.output.push(D::type_tag());
1064 // ndarr dims
1065 self.output.push(ndim as u8);
1066
1067 let dim_header_size = size_of::<u32>() * ndim;
1068 self.output.reserve(dim_header_size + array_buf_size);
1069
1070 for i in 0..ndim {
1071 // ndarr shape
1072 self.output
1073 .extend_from_slice((view.dim(i)? as u32).to_le_bytes().as_slice());
1074 }
1075
1076 let index = self.output.len();
1077 let writeable =
1078 unsafe { from_raw_parts_mut(self.output.as_mut_ptr().add(index), array_buf_size) };
1079
1080 // ndarr data
1081 ndarr::write_array_data(view, writeable, array_buf_size)?;
1082 unsafe { self.output.set_len(array_buf_size + index) }
1083 Ok(self)
1084 }
1085
1086 /// Record a timestamp value for the given column.
1087 ///
1088 /// ```no_run
1089 /// # use questdb::Result;
1090 /// # use questdb::ingress::{Buffer, SenderBuilder};
1091 /// use questdb::ingress::TimestampMicros;
1092 /// # fn main() -> Result<()> {
1093 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1094 /// # let mut buffer = sender.new_buffer();
1095 /// # buffer.table("x")?;
1096 /// buffer.column_ts("col_name", TimestampMicros::now())?;
1097 /// # Ok(())
1098 /// # }
1099 /// ```
1100 ///
1101 /// or
1102 ///
1103 /// ```no_run
1104 /// # use questdb::Result;
1105 /// # use questdb::ingress::{Buffer, SenderBuilder};
1106 /// use questdb::ingress::TimestampMicros;
1107 ///
1108 /// # fn main() -> Result<()> {
1109 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1110 /// # let mut buffer = sender.new_buffer();
1111 /// # buffer.table("x")?;
1112 /// buffer.column_ts("col_name", TimestampMicros::new(1659548204354448))?;
1113 /// # Ok(())
1114 /// # }
1115 /// ```
1116 ///
1117 /// or
1118 ///
1119 /// ```no_run
1120 /// # use questdb::Result;
1121 /// # use questdb::ingress::{Buffer, SenderBuilder};
1122 /// use questdb::ingress::TimestampMicros;
1123 /// use questdb::ingress::ColumnName;
1124 ///
1125 /// # fn main() -> Result<()> {
1126 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1127 /// # let mut buffer = sender.new_buffer();
1128 /// # buffer.table("x")?;
1129 /// let col_name = ColumnName::new("col_name")?;
1130 /// buffer.column_ts(col_name, TimestampMicros::now())?;
1131 /// # Ok(())
1132 /// # }
1133 /// ```
1134 ///
1135 /// or you can also pass in a `TimestampNanos`.
1136 ///
1137 /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1138 /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1139 ///
1140 /// This last option requires the `chrono_timestamp` feature.
1141 pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> crate::Result<&mut Self>
1142 where
1143 N: TryInto<ColumnName<'a>>,
1144 T: TryInto<Timestamp>,
1145 Error: From<N::Error>,
1146 Error: From<T::Error>,
1147 {
1148 self.write_column_key(name)?;
1149 let timestamp: Timestamp = value.try_into()?;
1150 let mut buf = itoa::Buffer::new();
1151 match timestamp {
1152 Timestamp::Micros(ts) => {
1153 let printed = buf.format(ts.as_i64());
1154 self.output.extend_from_slice(printed.as_bytes());
1155 self.output.push(b't');
1156 }
1157 Timestamp::Nanos(ts) => {
1158 let printed = buf.format(ts.as_i64());
1159 self.output.extend_from_slice(printed.as_bytes());
1160 self.output.push(b'n');
1161 }
1162 }
1163 Ok(self)
1164 }
1165
1166 /// Complete the current row with the designated timestamp. After this call, you can
1167 /// start recording the next row by calling [Buffer::table] again, or you can send
1168 /// the accumulated batch by calling [Sender::flush] or one of its variants.
1169 ///
1170 /// ```no_run
1171 /// # use questdb::Result;
1172 /// # use questdb::ingress::{Buffer, SenderBuilder};
1173 /// use questdb::ingress::TimestampNanos;
1174 /// # fn main() -> Result<()> {
1175 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1176 /// # let mut buffer = sender.new_buffer();
1177 /// # buffer.table("x")?.symbol("a", "b")?;
1178 /// buffer.at(TimestampNanos::now())?;
1179 /// # Ok(())
1180 /// # }
1181 /// ```
1182 ///
1183 /// or
1184 ///
1185 /// ```no_run
1186 /// # use questdb::Result;
1187 /// # use questdb::ingress::{Buffer, SenderBuilder};
1188 /// use questdb::ingress::TimestampNanos;
1189 ///
1190 /// # fn main() -> Result<()> {
1191 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1192 /// # let mut buffer = sender.new_buffer();
1193 /// # buffer.table("x")?.symbol("a", "b")?;
1194 /// buffer.at(TimestampNanos::new(1659548315647406592))?;
1195 /// # Ok(())
1196 /// # }
1197 /// ```
1198 ///
1199 /// You can also pass in a `TimestampMicros`.
1200 ///
1201 /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1202 /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1203 ///
1204 pub fn at<T>(&mut self, timestamp: T) -> crate::Result<()>
1205 where
1206 T: TryInto<Timestamp>,
1207 Error: From<T::Error>,
1208 {
1209 self.check_op(Op::At)?;
1210 let timestamp: Timestamp = timestamp.try_into()?;
1211
1212 // https://github.com/rust-lang/rust/issues/115880
1213 let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
1214 let timestamp: TimestampNanos = timestamp?;
1215
1216 let epoch_nanos = timestamp.as_i64();
1217 if epoch_nanos < 0 {
1218 return Err(error::fmt!(
1219 InvalidTimestamp,
1220 "Timestamp {} is negative. It must be >= 0.",
1221 epoch_nanos
1222 ));
1223 }
1224 let mut buf = itoa::Buffer::new();
1225 let printed = buf.format(epoch_nanos);
1226 self.output.push(b' ');
1227 self.output.extend_from_slice(printed.as_bytes());
1228 self.output.push(b'\n');
1229 self.state.op_case = OpCase::MayFlushOrTable;
1230 self.state.row_count += 1;
1231 Ok(())
1232 }
1233
1234 /// Complete the current row without providing a timestamp. The QuestDB instance
1235 /// will insert its own timestamp.
1236 ///
1237 /// Letting the server assign the timestamp can be faster since it reliably avoids
1238 /// out-of-order operations in the database for maximum ingestion throughput. However,
1239 /// it removes the ability to deduplicate rows.
1240 ///
1241 /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB
1242 /// server will set the timestamp only after receiving the row. If you're flushing
1243 /// infrequently, the server-assigned timestamp may be significantly behind the
1244 /// time the data was recorded in the buffer.
1245 ///
1246 /// In almost all cases, you should prefer the [Buffer::at] function.
1247 ///
1248 /// After this call, you can start recording the next row by calling [Buffer::table]
1249 /// again, or you can send the accumulated batch by calling [Sender::flush] or one of
1250 /// its variants.
1251 ///
1252 /// ```no_run
1253 /// # use questdb::Result;
1254 /// # use questdb::ingress::{Buffer, SenderBuilder};
1255 /// # fn main() -> Result<()> {
1256 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1257 /// # let mut buffer = sender.new_buffer();
1258 /// # buffer.table("x")?.symbol("a", "b")?;
1259 /// buffer.at_now()?;
1260 /// # Ok(())
1261 /// # }
1262 /// ```
1263 pub fn at_now(&mut self) -> crate::Result<()> {
1264 self.check_op(Op::At)?;
1265 self.output.push(b'\n');
1266 self.state.op_case = OpCase::MayFlushOrTable;
1267 self.state.row_count += 1;
1268 Ok(())
1269 }
1270}
1271
1272impl Debug for Buffer {
1273 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1274 f.debug_struct("Buffer")
1275 .field("output", &DebugBytes(&self.output))
1276 .field("state", &self.state)
1277 .field("marker", &self.marker)
1278 .field("max_name_len", &self.max_name_len)
1279 .field("protocol_version", &self.protocol_version)
1280 .finish()
1281 }
1282}