questdb/ingress/buffer.rs
1/*******************************************************************************
2 * ___ _ ____ ____
3 * / _ \ _ _ ___ ___| |_| _ \| __ )
4 * | | | | | | |/ _ \/ __| __| | | | _ \
5 * | |_| | |_| | __/\__ \ |_| |_| | |_) |
6 * \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 * Copyright (c) 2014-2019 Appsicle
9 * Copyright (c) 2019-2025 QuestDB
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 *
23 ******************************************************************************/
24use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed};
25use crate::ingress::{
26 ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampMicros,
27 TimestampNanos, ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS,
28 MAX_NAME_LEN_DEFAULT,
29};
30use crate::{error, Error};
31use std::fmt::{Debug, Formatter};
32use std::num::NonZeroUsize;
33use std::slice::from_raw_parts_mut;
34
35fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut Vec<u8>, s: &str)
36where
37 C: Fn(u8) -> bool,
38 Q: Fn(&mut Vec<u8>),
39{
40 let mut to_escape = 0usize;
41 for b in s.bytes() {
42 if check_escape_fn(b) {
43 to_escape += 1;
44 }
45 }
46
47 quoting_fn(output);
48
49 if to_escape == 0 {
50 // output.push_str(s);
51 output.extend_from_slice(s.as_bytes());
52 } else {
53 let additional = s.len() + to_escape;
54 output.reserve(additional);
55 let mut index = output.len();
56 unsafe { output.set_len(index + additional) };
57 for b in s.bytes() {
58 if check_escape_fn(b) {
59 unsafe {
60 *output.get_unchecked_mut(index) = b'\\';
61 }
62 index += 1;
63 }
64
65 unsafe {
66 *output.get_unchecked_mut(index) = b;
67 }
68 index += 1;
69 }
70 }
71
72 quoting_fn(output);
73}
74
75fn must_escape_unquoted(c: u8) -> bool {
76 matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
77}
78
79fn must_escape_quoted(c: u8) -> bool {
80 matches!(c, b'\n' | b'\r' | b'"' | b'\\')
81}
82
83fn write_escaped_unquoted(output: &mut Vec<u8>, s: &str) {
84 write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
85}
86
87fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
88 write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
89}
90
91pub(crate) struct F64Serializer {
92 buf: ryu::Buffer,
93 n: f64,
94}
95
96impl F64Serializer {
97 pub(crate) fn new(n: f64) -> Self {
98 F64Serializer {
99 buf: ryu::Buffer::new(),
100 n,
101 }
102 }
103
104 // This function was taken and customized from the ryu crate.
105 #[cold]
106 fn format_nonfinite(&self) -> &'static str {
107 const MANTISSA_MASK: u64 = 0x000fffffffffffff;
108 const SIGN_MASK: u64 = 0x8000000000000000;
109 let bits = self.n.to_bits();
110 if bits & MANTISSA_MASK != 0 {
111 "NaN"
112 } else if bits & SIGN_MASK != 0 {
113 "-Infinity"
114 } else {
115 "Infinity"
116 }
117 }
118
119 pub(crate) fn as_str(&mut self) -> &str {
120 if self.n.is_finite() {
121 self.buf.format_finite(self.n)
122 } else {
123 self.format_nonfinite()
124 }
125 }
126}
127
128#[derive(Debug, Copy, Clone)]
129enum Op {
130 Table = 1,
131 Symbol = 1 << 1,
132 Column = 1 << 2,
133 At = 1 << 3,
134 Flush = 1 << 4,
135}
136
137impl Op {
138 fn descr(self) -> &'static str {
139 match self {
140 Op::Table => "table",
141 Op::Symbol => "symbol",
142 Op::Column => "column",
143 Op::At => "at",
144 Op::Flush => "flush",
145 }
146 }
147}
148
149#[derive(Debug, Copy, Clone, PartialEq)]
150enum OpCase {
151 Init = Op::Table as isize,
152 TableWritten = Op::Symbol as isize | Op::Column as isize,
153 SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
154 ColumnWritten = Op::Column as isize | Op::At as isize,
155 MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
156}
157
158impl OpCase {
159 fn next_op_descr(self) -> &'static str {
160 match self {
161 OpCase::Init => "should have called `table` instead",
162 OpCase::TableWritten => "should have called `symbol` or `column` instead",
163 OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
164 OpCase::ColumnWritten => "should have called `column` or `at` instead",
165 OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
166 }
167 }
168}
169
170// IMPORTANT: This struct MUST remain `Copy` to ensure that
171// there are no heap allocations when performing marker operations.
172#[derive(Debug, Clone, Copy)]
173struct BufferState {
174 op_case: OpCase,
175 row_count: usize,
176 first_table_len: Option<NonZeroUsize>,
177 transactional: bool,
178}
179
180impl BufferState {
181 fn new() -> Self {
182 Self {
183 op_case: OpCase::Init,
184 row_count: 0,
185 first_table_len: None,
186 transactional: true,
187 }
188 }
189}
190
191/// A validated table name.
192///
193/// This type simply wraps a `&str`.
194///
195/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method,
196/// it doesn't have to validate it again. This saves CPU cycles.
197#[derive(Clone, Copy)]
198pub struct TableName<'a> {
199 name: &'a str,
200}
201
202impl<'a> TableName<'a> {
203 /// Construct a validated table name.
204 pub fn new(name: &'a str) -> crate::Result<Self> {
205 if name.is_empty() {
206 return Err(error::fmt!(
207 InvalidName,
208 "Table names must have a non-zero length."
209 ));
210 }
211
212 let mut prev = '\0';
213 for (index, c) in name.chars().enumerate() {
214 match c {
215 '.' => {
216 if index == 0 || index == name.len() - 1 || prev == '.' {
217 return Err(error::fmt!(
218 InvalidName,
219 concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
220 name,
221 index
222 ));
223 }
224 }
225 '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
226 | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
227 | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
228 | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
229 return Err(error::fmt!(
230 InvalidName,
231 concat!(
232 "Bad string {:?}: ",
233 "Table names can't contain ",
234 "a {:?} character, which was found at ",
235 "byte position {}."
236 ),
237 name,
238 c,
239 index
240 ));
241 }
242 '\u{feff}' => {
243 // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
244 // aka UTF-8 BOM if it appears anywhere in the string.
245 return Err(error::fmt!(
246 InvalidName,
247 concat!(
248 "Bad string {:?}: ",
249 "Table names can't contain ",
250 "a UTF-8 BOM character, which was found at ",
251 "byte position {}."
252 ),
253 name,
254 index
255 ));
256 }
257 _ => (),
258 }
259 prev = c;
260 }
261
262 Ok(Self { name })
263 }
264
265 /// Construct a table name without validating it.
266 ///
267 /// This breaks API encapsulation and is only intended for use
268 /// when the string was already previously validated.
269 ///
270 /// The QuestDB server will reject an invalid table name.
271 pub fn new_unchecked(name: &'a str) -> Self {
272 Self { name }
273 }
274}
275
276impl<'a> TryFrom<&'a str> for TableName<'a> {
277 type Error = Error;
278
279 fn try_from(name: &'a str) -> crate::Result<Self> {
280 Self::new(name)
281 }
282}
283
284impl<'a> AsRef<str> for TableName<'a> {
285 fn as_ref(&self) -> &str {
286 self.name
287 }
288}
289
290/// A validated column name.
291///
292/// This type simply wraps a `&str`.
293///
294/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method,
295/// it doesn't have to validate it again. This saves CPU cycles.
296#[derive(Clone, Copy)]
297pub struct ColumnName<'a> {
298 name: &'a str,
299}
300
301impl<'a> ColumnName<'a> {
302 /// Construct a validated table name.
303 pub fn new(name: &'a str) -> crate::Result<Self> {
304 if name.is_empty() {
305 return Err(error::fmt!(
306 InvalidName,
307 "Column names must have a non-zero length."
308 ));
309 }
310
311 for (index, c) in name.chars().enumerate() {
312 match c {
313 '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
314 | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
315 | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
316 | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
317 return Err(error::fmt!(
318 InvalidName,
319 concat!(
320 "Bad string {:?}: ",
321 "Column names can't contain ",
322 "a {:?} character, which was found at ",
323 "byte position {}."
324 ),
325 name,
326 c,
327 index
328 ));
329 }
330 '\u{FEFF}' => {
331 // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
332 // aka UTF-8 BOM if it appears anywhere in the string.
333 return Err(error::fmt!(
334 InvalidName,
335 concat!(
336 "Bad string {:?}: ",
337 "Column names can't contain ",
338 "a UTF-8 BOM character, which was found at ",
339 "byte position {}."
340 ),
341 name,
342 index
343 ));
344 }
345 _ => (),
346 }
347 }
348
349 Ok(Self { name })
350 }
351
352 /// Construct a column name without validating it.
353 ///
354 /// This breaks API encapsulation and is only intended for use
355 /// when the string was already previously validated.
356 ///
357 /// The QuestDB server will reject an invalid column name.
358 pub fn new_unchecked(name: &'a str) -> Self {
359 Self { name }
360 }
361}
362
363impl<'a> TryFrom<&'a str> for ColumnName<'a> {
364 type Error = Error;
365
366 fn try_from(name: &'a str) -> crate::Result<Self> {
367 Self::new(name)
368 }
369}
370
371impl<'a> AsRef<str> for ColumnName<'a> {
372 fn as_ref(&self) -> &str {
373 self.name
374 }
375}
376
377/// A reusable buffer to prepare a batch of ILP messages.
378///
379/// # Example
380///
381/// ```no_run
382/// # use questdb::Result;
383/// # use questdb::ingress::SenderBuilder;
384///
385/// # fn main() -> Result<()> {
386/// # let mut sender = SenderBuilder::from_conf("http::addr=localhost:9000;")?.build()?;
387/// # use questdb::Result;
388/// use questdb::ingress::{Buffer, TimestampMicros, TimestampNanos};
389/// let mut buffer = sender.new_buffer();
390///
391/// // first row
392/// buffer
393/// .table("table1")?
394/// .symbol("bar", "baz")?
395/// .column_bool("a", false)?
396/// .column_i64("b", 42)?
397/// .column_f64("c", 3.14)?
398/// .column_str("d", "hello")?
399/// .column_ts("e", TimestampMicros::now())?
400/// .at(TimestampNanos::now())?;
401///
402/// // second row
403/// buffer
404/// .table("table2")?
405/// .symbol("foo", "bar")?
406/// .at(TimestampNanos::now())?;
407/// # Ok(())
408/// # }
409/// ```
410///
411/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush).
412///
413/// # Sequential Coupling
414/// The Buffer API is sequentially coupled:
415/// * A row always starts with [`table`](Buffer::table).
416/// * A row must contain at least one [`symbol`](Buffer::symbol) or
417/// column (
418/// [`column_bool`](Buffer::column_bool),
419/// [`column_i64`](Buffer::column_i64),
420/// [`column_f64`](Buffer::column_f64),
421/// [`column_str`](Buffer::column_str),
422/// [`column_arr`](Buffer::column_arr),
423/// [`column_ts`](Buffer::column_ts)).
424/// * Symbols must appear before columns.
425/// * A row must be terminated with either [`at`](Buffer::at) or
426/// [`at_now`](Buffer::at_now).
427///
428/// This diagram visualizes the sequence:
429///
430/// <img src="https://raw.githubusercontent.com/questdb/c-questdb-client/main/api_seq/seq.svg">
431///
432/// # Buffer method calls, Serialized ILP types and QuestDB types
433///
434/// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) |
435/// |---------------|--------------------------------------------------------------|
436/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) |
437/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) |
438/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) |
439/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) |
440/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) |
441/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.io/docs/reference/api/ilp/columnset-types#array) |
442/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) |
443///
444/// QuestDB supports both `STRING` and `SYMBOL` column types.
445///
446/// To understand the difference, refer to the
447/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell,
448/// symbols are interned strings, most suitable for identifiers that are repeated many
449/// times throughout the column. They offer an advantage in storage space and query
450/// performance.
451///
452/// # Inserting NULL values
453///
454/// To insert a NULL value, skip the symbol or column for that row.
455///
456/// # Recovering from validation errors
457///
458/// If you want to recover from potential validation errors, call
459/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state,
460/// append as many rows or parts of rows as you like, and then call
461/// [`buffer.clear_marker()`](Buffer::clear_marker) on success.
462///
463/// If there was an error in one of the rows, use
464/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the
465/// marked last known good state.
466///
467#[derive(Clone)]
468pub struct Buffer {
469 output: Vec<u8>,
470 state: BufferState,
471 marker: Option<(usize, BufferState)>,
472 max_name_len: usize,
473 protocol_version: ProtocolVersion,
474}
475
476impl Buffer {
477 /// Creates a new [`Buffer`] with default parameters.
478 ///
479 /// - Uses the specified protocol version
480 /// - Sets maximum name length to **127 characters** (QuestDB server default)
481 ///
482 /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
483 /// and [`Sender::max_name_len`] is 127.
484 ///
485 /// For custom name lengths, use [`Self::with_max_name_len`]
486 pub fn new(protocol_version: ProtocolVersion) -> Self {
487 Self::with_max_name_len(protocol_version, MAX_NAME_LEN_DEFAULT)
488 }
489
490 /// Creates a new [`Buffer`] with a custom maximum name length.
491 ///
492 /// - `max_name_len`: Maximum allowed length for table/column names, match
493 /// your QuestDB server's `cairo.max.file.name.length` configuration
494 /// - `protocol_version`: Protocol version to use
495 ///
496 /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
497 /// and [`Sender::max_name_len`].
498 ///
499 /// For the default max name length limit (127), use [`Self::new`].
500 pub fn with_max_name_len(protocol_version: ProtocolVersion, max_name_len: usize) -> Self {
501 Self {
502 output: Vec::new(),
503 state: BufferState::new(),
504 marker: None,
505 max_name_len,
506 protocol_version,
507 }
508 }
509
510 pub fn protocol_version(&self) -> ProtocolVersion {
511 self.protocol_version
512 }
513
514 /// Pre-allocate to ensure the buffer has enough capacity for at least the
515 /// specified additional byte count. This may be rounded up.
516 /// This does not allocate if such additional capacity is already satisfied.
517 /// See: `capacity`.
518 pub fn reserve(&mut self, additional: usize) {
519 self.output.reserve(additional);
520 }
521
522 /// The number of bytes accumulated in the buffer.
523 pub fn len(&self) -> usize {
524 self.output.len()
525 }
526
527 /// The number of rows accumulated in the buffer.
528 pub fn row_count(&self) -> usize {
529 self.state.row_count
530 }
531
532 /// Tells whether the buffer is transactional. It is transactional iff it contains
533 /// data for at most one table. Additionally, you must send the buffer over HTTP to
534 /// get transactional behavior.
535 pub fn transactional(&self) -> bool {
536 self.state.transactional
537 }
538
539 pub fn is_empty(&self) -> bool {
540 self.output.is_empty()
541 }
542
543 /// The total number of bytes the buffer can hold before it needs to resize.
544 pub fn capacity(&self) -> usize {
545 self.output.capacity()
546 }
547
548 pub fn as_bytes(&self) -> &[u8] {
549 &self.output
550 }
551
552 /// Mark a rewind point.
553 /// This allows undoing accumulated changes to the buffer for one or more
554 /// rows by calling [`rewind_to_marker`](Buffer::rewind_to_marker).
555 /// Any previous marker will be discarded.
556 /// Once the marker is no longer needed, call
557 /// [`clear_marker`](Buffer::clear_marker).
558 pub fn set_marker(&mut self) -> crate::Result<()> {
559 if (self.state.op_case as isize & Op::Table as isize) == 0 {
560 return Err(error::fmt!(
561 InvalidApiCall,
562 concat!(
563 "Can't set the marker whilst constructing a line. ",
564 "A marker may only be set on an empty buffer or after ",
565 "`at` or `at_now` is called."
566 )
567 ));
568 }
569 self.marker = Some((self.output.len(), self.state));
570 Ok(())
571 }
572
573 /// Undo all changes since the last [`set_marker`](Buffer::set_marker)
574 /// call.
575 ///
576 /// As a side effect, this also clears the marker.
577 pub fn rewind_to_marker(&mut self) -> crate::Result<()> {
578 if let Some((position, state)) = self.marker.take() {
579 self.output.truncate(position);
580 self.state = state;
581 Ok(())
582 } else {
583 Err(error::fmt!(
584 InvalidApiCall,
585 "Can't rewind to the marker: No marker set."
586 ))
587 }
588 }
589
590 /// Discard any marker as may have been set by
591 /// [`set_marker`](Buffer::set_marker).
592 ///
593 /// Idempotent.
594 pub fn clear_marker(&mut self) {
595 self.marker = None;
596 }
597
598 /// Reset the buffer and clear contents whilst retaining
599 /// [`capacity`](Buffer::capacity).
600 pub fn clear(&mut self) {
601 self.output.clear();
602 self.state = BufferState::new();
603 self.marker = None;
604 }
605
606 /// Check if the next API operation is allowed as per the OP case state machine.
607 #[inline(always)]
608 fn check_op(&self, op: Op) -> crate::Result<()> {
609 if (self.state.op_case as isize & op as isize) > 0 {
610 Ok(())
611 } else {
612 Err(error::fmt!(
613 InvalidApiCall,
614 "State error: Bad call to `{}`, {}.",
615 op.descr(),
616 self.state.op_case.next_op_descr()
617 ))
618 }
619 }
620
621 /// Checks if this buffer is ready to be flushed to a sender via one of the
622 /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer
623 /// is ready to be flushed via a [`Sender`] while an [`Err`] will contain a
624 /// message indicating why this [`Buffer`] cannot be flushed at the moment.
625 #[inline(always)]
626 pub fn check_can_flush(&self) -> crate::Result<()> {
627 self.check_op(Op::Flush)
628 }
629
630 #[inline(always)]
631 fn validate_max_name_len(&self, name: &str) -> crate::Result<()> {
632 if name.len() > self.max_name_len {
633 return Err(error::fmt!(
634 InvalidName,
635 "Bad name: {:?}: Too long (max {} characters)",
636 name,
637 self.max_name_len
638 ));
639 }
640 Ok(())
641 }
642
643 /// Begin recording a new row for the given table.
644 ///
645 /// ```no_run
646 /// # use questdb::Result;
647 /// # use questdb::ingress::{Buffer, SenderBuilder};
648 /// # fn main() -> Result<()> {
649 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
650 /// # let mut buffer = sender.new_buffer();
651 /// buffer.table("table_name")?;
652 /// # Ok(())
653 /// # }
654 /// ```
655 ///
656 /// or
657 ///
658 /// ```no_run
659 /// # use questdb::Result;
660 /// # use questdb::ingress::{Buffer, SenderBuilder};
661 /// use questdb::ingress::TableName;
662 ///
663 /// # fn main() -> Result<()> {
664 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
665 /// # let mut buffer = sender.new_buffer();
666 /// let table_name = TableName::new("table_name")?;
667 /// buffer.table(table_name)?;
668 /// # Ok(())
669 /// # }
670 /// ```
671 pub fn table<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
672 where
673 N: TryInto<TableName<'a>>,
674 Error: From<N::Error>,
675 {
676 let name: TableName<'a> = name.try_into()?;
677 self.validate_max_name_len(name.name)?;
678 self.check_op(Op::Table)?;
679 let table_begin = self.output.len();
680 write_escaped_unquoted(&mut self.output, name.name);
681 let table_end = self.output.len();
682 self.state.op_case = OpCase::TableWritten;
683
684 // A buffer stops being transactional if it targets multiple tables.
685 if let Some(first_table_len) = &self.state.first_table_len {
686 let first_table = &self.output[0..first_table_len.get()];
687 let this_table = &self.output[table_begin..table_end];
688 if first_table != this_table {
689 self.state.transactional = false;
690 }
691 } else {
692 debug_assert!(table_begin == 0);
693
694 // This is a bit confusing, so worth explaining:
695 // `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
696 // but we know that `table_end` is never 0 here, we just need an option type
697 // anyway, so we don't bother unwrapping it to then wrap it again.
698 let first_table_len = NonZeroUsize::new(table_end);
699
700 // Instead we just assert that it's `Some`.
701 debug_assert!(first_table_len.is_some());
702
703 self.state.first_table_len = first_table_len;
704 }
705 Ok(self)
706 }
707
708 /// Record a symbol for the given column.
709 /// Make sure you record all symbol columns before any other column type.
710 ///
711 /// ```no_run
712 /// # use questdb::Result;
713 /// # use questdb::ingress::{Buffer, SenderBuilder};
714 /// # fn main() -> Result<()> {
715 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
716 /// # let mut buffer = sender.new_buffer();
717 /// # buffer.table("x")?;
718 /// buffer.symbol("col_name", "value")?;
719 /// # Ok(())
720 /// # }
721 /// ```
722 ///
723 /// or
724 ///
725 /// ```no_run
726 /// # use questdb::Result;
727 /// # use questdb::ingress::{Buffer, SenderBuilder};
728 /// # fn main() -> Result<()> {
729 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
730 /// # let mut buffer = sender.new_buffer();
731 /// # buffer.table("x")?;
732 /// let value: String = "value".to_owned();
733 /// buffer.symbol("col_name", value)?;
734 /// # Ok(())
735 /// # }
736 /// ```
737 ///
738 /// or
739 ///
740 /// ```no_run
741 /// # use questdb::Result;
742 /// # use questdb::ingress::{Buffer, SenderBuilder};
743 /// use questdb::ingress::ColumnName;
744 ///
745 /// # fn main() -> Result<()> {
746 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
747 /// # let mut buffer = sender.new_buffer();
748 /// # buffer.table("x")?;
749 /// let col_name = ColumnName::new("col_name")?;
750 /// buffer.symbol(col_name, "value")?;
751 /// # Ok(())
752 /// # }
753 /// ```
754 ///
755 pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
756 where
757 N: TryInto<ColumnName<'a>>,
758 S: AsRef<str>,
759 Error: From<N::Error>,
760 {
761 let name: ColumnName<'a> = name.try_into()?;
762 self.validate_max_name_len(name.name)?;
763 self.check_op(Op::Symbol)?;
764 self.output.push(b',');
765 write_escaped_unquoted(&mut self.output, name.name);
766 self.output.push(b'=');
767 write_escaped_unquoted(&mut self.output, value.as_ref());
768 self.state.op_case = OpCase::SymbolWritten;
769 Ok(self)
770 }
771
772 fn write_column_key<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
773 where
774 N: TryInto<ColumnName<'a>>,
775 Error: From<N::Error>,
776 {
777 let name: ColumnName<'a> = name.try_into()?;
778 self.validate_max_name_len(name.name)?;
779 self.check_op(Op::Column)?;
780 self.output
781 .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
782 b' '
783 } else {
784 b','
785 });
786 write_escaped_unquoted(&mut self.output, name.name);
787 self.output.push(b'=');
788 self.state.op_case = OpCase::ColumnWritten;
789 Ok(self)
790 }
791
792 /// Record a boolean value for the given column.
793 ///
794 /// ```no_run
795 /// # use questdb::Result;
796 /// # use questdb::ingress::{Buffer, SenderBuilder};
797 /// # fn main() -> Result<()> {
798 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
799 /// # let mut buffer = sender.new_buffer();
800 /// # buffer.table("x")?;
801 /// buffer.column_bool("col_name", true)?;
802 /// # Ok(())
803 /// # }
804 /// ```
805 ///
806 /// or
807 ///
808 /// ```no_run
809 /// # use questdb::Result;
810 /// # use questdb::ingress::{Buffer, SenderBuilder};
811 /// use questdb::ingress::ColumnName;
812 ///
813 /// # fn main() -> Result<()> {
814 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
815 /// # let mut buffer = sender.new_buffer();
816 /// # buffer.table("x")?;
817 /// let col_name = ColumnName::new("col_name")?;
818 /// buffer.column_bool(col_name, true)?;
819 /// # Ok(())
820 /// # }
821 /// ```
822 pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> crate::Result<&mut Self>
823 where
824 N: TryInto<ColumnName<'a>>,
825 Error: From<N::Error>,
826 {
827 self.write_column_key(name)?;
828 self.output.push(if value { b't' } else { b'f' });
829 Ok(self)
830 }
831
832 /// Record an integer value for the given column.
833 ///
834 /// ```no_run
835 /// # use questdb::Result;
836 /// # use questdb::ingress::{Buffer, SenderBuilder};
837 /// # fn main() -> Result<()> {
838 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
839 /// # let mut buffer = sender.new_buffer();
840 /// # buffer.table("x")?;
841 /// buffer.column_i64("col_name", 42)?;
842 /// # Ok(())
843 /// # }
844 /// ```
845 ///
846 /// or
847 ///
848 /// ```no_run
849 /// # use questdb::Result;
850 /// # use questdb::ingress::{Buffer, SenderBuilder};
851 /// use questdb::ingress::ColumnName;
852 ///
853 /// # fn main() -> Result<()> {
854 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
855 /// # let mut buffer = sender.new_buffer();
856 /// # buffer.table("x")?;
857 /// let col_name = ColumnName::new("col_name")?;
858 /// buffer.column_i64(col_name, 42)?;
859 /// # Ok(())
860 /// # }
861 /// ```
862 pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> crate::Result<&mut Self>
863 where
864 N: TryInto<ColumnName<'a>>,
865 Error: From<N::Error>,
866 {
867 self.write_column_key(name)?;
868 let mut buf = itoa::Buffer::new();
869 let printed = buf.format(value);
870 self.output.extend_from_slice(printed.as_bytes());
871 self.output.push(b'i');
872 Ok(self)
873 }
874
875 /// Record a floating point value for the given column.
876 ///
877 /// ```no_run
878 /// # use questdb::Result;
879 /// # use questdb::ingress::{Buffer, SenderBuilder};
880 /// # fn main() -> Result<()> {
881 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
882 /// # let mut buffer = sender.new_buffer();
883 /// # buffer.table("x")?;
884 /// buffer.column_f64("col_name", 3.14)?;
885 /// # Ok(())
886 /// # }
887 /// ```
888 ///
889 /// or
890 ///
891 /// ```no_run
892 /// # use questdb::Result;
893 /// # use questdb::ingress::{Buffer, SenderBuilder};
894 /// use questdb::ingress::ColumnName;
895 ///
896 /// # fn main() -> Result<()> {
897 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
898 /// # let mut buffer = sender.new_buffer();
899 /// # buffer.table("x")?;
900 /// let col_name = ColumnName::new("col_name")?;
901 /// buffer.column_f64(col_name, 3.14)?;
902 /// # Ok(())
903 /// # }
904 /// ```
905 pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> crate::Result<&mut Self>
906 where
907 N: TryInto<ColumnName<'a>>,
908 Error: From<N::Error>,
909 {
910 self.write_column_key(name)?;
911 if !matches!(self.protocol_version, ProtocolVersion::V1) {
912 self.output.push(b'=');
913 self.output.push(DOUBLE_BINARY_FORMAT_TYPE);
914 self.output.extend_from_slice(&value.to_le_bytes())
915 } else {
916 let mut ser = F64Serializer::new(value);
917 self.output.extend_from_slice(ser.as_str().as_bytes())
918 }
919 Ok(self)
920 }
921
922 /// Record a string value for the given column.
923 ///
924 /// ```no_run
925 /// # use questdb::Result;
926 /// # use questdb::ingress::{Buffer, SenderBuilder};
927 /// # fn main() -> Result<()> {
928 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
929 /// # let mut buffer = sender.new_buffer();
930 /// # buffer.table("x")?;
931 /// buffer.column_str("col_name", "value")?;
932 /// # Ok(())
933 /// # }
934 /// ```
935 ///
936 /// or
937 ///
938 /// ```no_run
939 /// # use questdb::Result;
940 /// # use questdb::ingress::{Buffer, SenderBuilder};
941 /// # fn main() -> Result<()> {
942 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
943 /// # let mut buffer = sender.new_buffer();
944 /// # buffer.table("x")?;
945 /// let value: String = "value".to_owned();
946 /// buffer.column_str("col_name", value)?;
947 /// # Ok(())
948 /// # }
949 /// ```
950 ///
951 /// or
952 ///
953 /// ```no_run
954 /// # use questdb::Result;
955 /// # use questdb::ingress::{Buffer, SenderBuilder};
956 /// use questdb::ingress::ColumnName;
957 ///
958 /// # fn main() -> Result<()> {
959 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
960 /// # let mut buffer = sender.new_buffer();
961 /// # buffer.table("x")?;
962 /// let col_name = ColumnName::new("col_name")?;
963 /// buffer.column_str(col_name, "value")?;
964 /// # Ok(())
965 /// # }
966 /// ```
967 pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
968 where
969 N: TryInto<ColumnName<'a>>,
970 S: AsRef<str>,
971 Error: From<N::Error>,
972 {
973 self.write_column_key(name)?;
974 write_escaped_quoted(&mut self.output, value.as_ref());
975 Ok(self)
976 }
977
978 /// Record a multidimensional array value for the given column.
979 ///
980 /// Supports arrays with up to [`MAX_ARRAY_DIMS`] dimensions. The array elements must
981 /// be of type `f64`, which is currently the only supported data type.
982 ///
983 /// **Note**: QuestDB server version 9.0.0 or later is required for array support.
984 ///
985 /// # Examples
986 ///
987 /// Recording a 2D array using slices:
988 ///
989 /// ```no_run
990 /// # use questdb::Result;
991 /// # use questdb::ingress::{Buffer, SenderBuilder};
992 /// # fn main() -> Result<()> {
993 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
994 /// # let mut buffer = sender.new_buffer();
995 /// # buffer.table("x")?;
996 /// let array_2d = vec![vec![1.1, 2.2], vec![3.3, 4.4]];
997 /// buffer.column_arr("array_col", &array_2d)?;
998 /// # Ok(())
999 /// # }
1000 /// ```
1001 ///
1002 /// Recording a 3D array using vectors:
1003 ///
1004 /// ```no_run
1005 /// # use questdb::Result;
1006 /// # use questdb::ingress::{Buffer, ColumnName, SenderBuilder};
1007 /// # fn main() -> Result<()> {
1008 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1009 /// # let mut buffer = sender.new_buffer();
1010 /// # buffer.table("x1")?;
1011 /// let array_3d = vec![vec![vec![42.0; 4]; 3]; 2];
1012 /// let col_name = ColumnName::new("col1")?;
1013 /// buffer.column_arr(col_name, &array_3d)?;
1014 /// # Ok(())
1015 /// # }
1016 /// ```
1017 ///
1018 /// # Errors
1019 ///
1020 /// Returns [`Error`] if:
1021 /// - Array dimensions exceed [`MAX_ARRAY_DIMS`]
1022 /// - Failed to get dimension sizes
1023 /// - Column name validation fails
1024 /// - Protocol version v1 is used (arrays require v2+)
1025 #[allow(private_bounds)]
1026 pub fn column_arr<'a, N, T, D>(&mut self, name: N, view: &T) -> crate::Result<&mut Self>
1027 where
1028 N: TryInto<ColumnName<'a>>,
1029 T: NdArrayView<D>,
1030 D: ArrayElement + ArrayElementSealed,
1031 Error: From<N::Error>,
1032 {
1033 if self.protocol_version == ProtocolVersion::V1 {
1034 return Err(error::fmt!(
1035 ProtocolVersionError,
1036 "Protocol version v1 does not support array datatype",
1037 ));
1038 }
1039 let ndim = view.ndim();
1040 if ndim == 0 {
1041 return Err(error::fmt!(
1042 ArrayError,
1043 "Zero-dimensional arrays are not supported",
1044 ));
1045 }
1046
1047 // check dimension less equal than max dims
1048 if MAX_ARRAY_DIMS < ndim {
1049 return Err(error::fmt!(
1050 ArrayError,
1051 "Array dimension mismatch: expected at most {} dimensions, but got {}",
1052 MAX_ARRAY_DIMS,
1053 ndim
1054 ));
1055 }
1056
1057 let array_buf_size = check_and_get_array_bytes_size(view)?;
1058 self.write_column_key(name)?;
1059 // binary format flag '='
1060 self.output.push(b'=');
1061 // binary format entity type
1062 self.output.push(ARRAY_BINARY_FORMAT_TYPE);
1063 // ndarr datatype
1064 self.output.push(D::type_tag());
1065 // ndarr dims
1066 self.output.push(ndim as u8);
1067
1068 let dim_header_size = size_of::<u32>() * ndim;
1069 self.output.reserve(dim_header_size + array_buf_size);
1070
1071 for i in 0..ndim {
1072 // ndarr shape
1073 self.output
1074 .extend_from_slice((view.dim(i)? as u32).to_le_bytes().as_slice());
1075 }
1076
1077 let index = self.output.len();
1078 let writeable =
1079 unsafe { from_raw_parts_mut(self.output.as_mut_ptr().add(index), array_buf_size) };
1080
1081 // ndarr data
1082 ndarr::write_array_data(view, writeable, array_buf_size)?;
1083 unsafe { self.output.set_len(array_buf_size + index) }
1084 Ok(self)
1085 }
1086
1087 /// Record a timestamp value for the given column.
1088 ///
1089 /// ```no_run
1090 /// # use questdb::Result;
1091 /// # use questdb::ingress::{Buffer, SenderBuilder};
1092 /// use questdb::ingress::TimestampMicros;
1093 /// # fn main() -> Result<()> {
1094 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1095 /// # let mut buffer = sender.new_buffer();
1096 /// # buffer.table("x")?;
1097 /// buffer.column_ts("col_name", TimestampMicros::now())?;
1098 /// # Ok(())
1099 /// # }
1100 /// ```
1101 ///
1102 /// or
1103 ///
1104 /// ```no_run
1105 /// # use questdb::Result;
1106 /// # use questdb::ingress::{Buffer, SenderBuilder};
1107 /// use questdb::ingress::TimestampMicros;
1108 ///
1109 /// # fn main() -> Result<()> {
1110 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1111 /// # let mut buffer = sender.new_buffer();
1112 /// # buffer.table("x")?;
1113 /// buffer.column_ts("col_name", TimestampMicros::new(1659548204354448))?;
1114 /// # Ok(())
1115 /// # }
1116 /// ```
1117 ///
1118 /// or
1119 ///
1120 /// ```no_run
1121 /// # use questdb::Result;
1122 /// # use questdb::ingress::{Buffer, SenderBuilder};
1123 /// use questdb::ingress::TimestampMicros;
1124 /// use questdb::ingress::ColumnName;
1125 ///
1126 /// # fn main() -> Result<()> {
1127 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1128 /// # let mut buffer = sender.new_buffer();
1129 /// # buffer.table("x")?;
1130 /// let col_name = ColumnName::new("col_name")?;
1131 /// buffer.column_ts(col_name, TimestampMicros::now())?;
1132 /// # Ok(())
1133 /// # }
1134 /// ```
1135 ///
1136 /// or you can also pass in a `TimestampNanos`.
1137 ///
1138 /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1139 /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1140 ///
1141 /// This last option requires the `chrono_timestamp` feature.
1142 pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> crate::Result<&mut Self>
1143 where
1144 N: TryInto<ColumnName<'a>>,
1145 T: TryInto<Timestamp>,
1146 Error: From<N::Error>,
1147 Error: From<T::Error>,
1148 {
1149 self.write_column_key(name)?;
1150 let timestamp: Timestamp = value.try_into()?;
1151 let timestamp: TimestampMicros = timestamp.try_into()?;
1152 let mut buf = itoa::Buffer::new();
1153 let printed = buf.format(timestamp.as_i64());
1154 self.output.extend_from_slice(printed.as_bytes());
1155 self.output.push(b't');
1156 Ok(self)
1157 }
1158
1159 /// Complete the current row with the designated timestamp. After this call, you can
1160 /// start recording the next row by calling [Buffer::table] again, or you can send
1161 /// the accumulated batch by calling [Sender::flush] or one of its variants.
1162 ///
1163 /// ```no_run
1164 /// # use questdb::Result;
1165 /// # use questdb::ingress::{Buffer, SenderBuilder};
1166 /// use questdb::ingress::TimestampNanos;
1167 /// # fn main() -> Result<()> {
1168 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1169 /// # let mut buffer = sender.new_buffer();
1170 /// # buffer.table("x")?.symbol("a", "b")?;
1171 /// buffer.at(TimestampNanos::now())?;
1172 /// # Ok(())
1173 /// # }
1174 /// ```
1175 ///
1176 /// or
1177 ///
1178 /// ```no_run
1179 /// # use questdb::Result;
1180 /// # use questdb::ingress::{Buffer, SenderBuilder};
1181 /// use questdb::ingress::TimestampNanos;
1182 ///
1183 /// # fn main() -> Result<()> {
1184 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1185 /// # let mut buffer = sender.new_buffer();
1186 /// # buffer.table("x")?.symbol("a", "b")?;
1187 /// buffer.at(TimestampNanos::new(1659548315647406592))?;
1188 /// # Ok(())
1189 /// # }
1190 /// ```
1191 ///
1192 /// You can also pass in a `TimestampMicros`.
1193 ///
1194 /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1195 /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1196 ///
1197 pub fn at<T>(&mut self, timestamp: T) -> crate::Result<()>
1198 where
1199 T: TryInto<Timestamp>,
1200 Error: From<T::Error>,
1201 {
1202 self.check_op(Op::At)?;
1203 let timestamp: Timestamp = timestamp.try_into()?;
1204
1205 // https://github.com/rust-lang/rust/issues/115880
1206 let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
1207 let timestamp: TimestampNanos = timestamp?;
1208
1209 let epoch_nanos = timestamp.as_i64();
1210 if epoch_nanos < 0 {
1211 return Err(error::fmt!(
1212 InvalidTimestamp,
1213 "Timestamp {} is negative. It must be >= 0.",
1214 epoch_nanos
1215 ));
1216 }
1217 let mut buf = itoa::Buffer::new();
1218 let printed = buf.format(epoch_nanos);
1219 self.output.push(b' ');
1220 self.output.extend_from_slice(printed.as_bytes());
1221 self.output.push(b'\n');
1222 self.state.op_case = OpCase::MayFlushOrTable;
1223 self.state.row_count += 1;
1224 Ok(())
1225 }
1226
1227 /// Complete the current row without providing a timestamp. The QuestDB instance
1228 /// will insert its own timestamp.
1229 ///
1230 /// Letting the server assign the timestamp can be faster since it reliably avoids
1231 /// out-of-order operations in the database for maximum ingestion throughput. However,
1232 /// it removes the ability to deduplicate rows.
1233 ///
1234 /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB
1235 /// server will set the timestamp only after receiving the row. If you're flushing
1236 /// infrequently, the server-assigned timestamp may be significantly behind the
1237 /// time the data was recorded in the buffer.
1238 ///
1239 /// In almost all cases, you should prefer the [Buffer::at] function.
1240 ///
1241 /// After this call, you can start recording the next row by calling [Buffer::table]
1242 /// again, or you can send the accumulated batch by calling [Sender::flush] or one of
1243 /// its variants.
1244 ///
1245 /// ```no_run
1246 /// # use questdb::Result;
1247 /// # use questdb::ingress::{Buffer, SenderBuilder};
1248 /// # fn main() -> Result<()> {
1249 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1250 /// # let mut buffer = sender.new_buffer();
1251 /// # buffer.table("x")?.symbol("a", "b")?;
1252 /// buffer.at_now()?;
1253 /// # Ok(())
1254 /// # }
1255 /// ```
1256 pub fn at_now(&mut self) -> crate::Result<()> {
1257 self.check_op(Op::At)?;
1258 self.output.push(b'\n');
1259 self.state.op_case = OpCase::MayFlushOrTable;
1260 self.state.row_count += 1;
1261 Ok(())
1262 }
1263}
1264
1265impl Debug for Buffer {
1266 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1267 f.debug_struct("Buffer")
1268 .field("output", &DebugBytes(&self.output))
1269 .field("state", &self.state)
1270 .field("marker", &self.marker)
1271 .field("max_name_len", &self.max_name_len)
1272 .field("protocol_version", &self.protocol_version)
1273 .finish()
1274 }
1275}