questdb/ingress/
buffer.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2025 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed};
25use crate::ingress::{
26    ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampNanos,
27    ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS, MAX_NAME_LEN_DEFAULT,
28};
29use crate::{error, Error};
30use std::fmt::{Debug, Formatter};
31use std::num::NonZeroUsize;
32use std::slice::from_raw_parts_mut;
33
34fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut Vec<u8>, s: &str)
35where
36    C: Fn(u8) -> bool,
37    Q: Fn(&mut Vec<u8>),
38{
39    let mut to_escape = 0usize;
40    for b in s.bytes() {
41        if check_escape_fn(b) {
42            to_escape += 1;
43        }
44    }
45
46    quoting_fn(output);
47
48    if to_escape == 0 {
49        // output.push_str(s);
50        output.extend_from_slice(s.as_bytes());
51    } else {
52        let additional = s.len() + to_escape;
53        output.reserve(additional);
54        let mut index = output.len();
55        unsafe { output.set_len(index + additional) };
56        for b in s.bytes() {
57            if check_escape_fn(b) {
58                unsafe {
59                    *output.get_unchecked_mut(index) = b'\\';
60                }
61                index += 1;
62            }
63
64            unsafe {
65                *output.get_unchecked_mut(index) = b;
66            }
67            index += 1;
68        }
69    }
70
71    quoting_fn(output);
72}
73
74fn must_escape_unquoted(c: u8) -> bool {
75    matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
76}
77
78fn must_escape_quoted(c: u8) -> bool {
79    matches!(c, b'\n' | b'\r' | b'"' | b'\\')
80}
81
82fn write_escaped_unquoted(output: &mut Vec<u8>, s: &str) {
83    write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
84}
85
86fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
87    write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
88}
89
90pub(crate) struct F64Serializer {
91    buf: ryu::Buffer,
92    n: f64,
93}
94
95impl F64Serializer {
96    pub(crate) fn new(n: f64) -> Self {
97        F64Serializer {
98            buf: ryu::Buffer::new(),
99            n,
100        }
101    }
102
103    // This function was taken and customized from the ryu crate.
104    #[cold]
105    fn format_nonfinite(&self) -> &'static str {
106        const MANTISSA_MASK: u64 = 0x000fffffffffffff;
107        const SIGN_MASK: u64 = 0x8000000000000000;
108        let bits = self.n.to_bits();
109        if bits & MANTISSA_MASK != 0 {
110            "NaN"
111        } else if bits & SIGN_MASK != 0 {
112            "-Infinity"
113        } else {
114            "Infinity"
115        }
116    }
117
118    pub(crate) fn as_str(&mut self) -> &str {
119        if self.n.is_finite() {
120            self.buf.format_finite(self.n)
121        } else {
122            self.format_nonfinite()
123        }
124    }
125}
126
127#[derive(Debug, Copy, Clone)]
128enum Op {
129    Table = 1,
130    Symbol = 1 << 1,
131    Column = 1 << 2,
132    At = 1 << 3,
133    Flush = 1 << 4,
134}
135
136impl Op {
137    fn descr(self) -> &'static str {
138        match self {
139            Op::Table => "table",
140            Op::Symbol => "symbol",
141            Op::Column => "column",
142            Op::At => "at",
143            Op::Flush => "flush",
144        }
145    }
146}
147
148#[derive(Debug, Copy, Clone, PartialEq)]
149enum OpCase {
150    Init = Op::Table as isize,
151    TableWritten = Op::Symbol as isize | Op::Column as isize,
152    SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
153    ColumnWritten = Op::Column as isize | Op::At as isize,
154    MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
155}
156
157impl OpCase {
158    fn next_op_descr(self) -> &'static str {
159        match self {
160            OpCase::Init => "should have called `table` instead",
161            OpCase::TableWritten => "should have called `symbol` or `column` instead",
162            OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
163            OpCase::ColumnWritten => "should have called `column` or `at` instead",
164            OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
165        }
166    }
167}
168
169// IMPORTANT: This struct MUST remain `Copy` to ensure that
170// there are no heap allocations when performing marker operations.
171#[derive(Debug, Clone, Copy)]
172struct BufferState {
173    op_case: OpCase,
174    row_count: usize,
175    first_table_len: Option<NonZeroUsize>,
176    transactional: bool,
177}
178
179impl BufferState {
180    fn new() -> Self {
181        Self {
182            op_case: OpCase::Init,
183            row_count: 0,
184            first_table_len: None,
185            transactional: true,
186        }
187    }
188}
189
190/// A validated table name.
191///
192/// This type simply wraps a `&str`.
193///
194/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method,
195/// it doesn't have to validate it again. This saves CPU cycles.
196#[derive(Clone, Copy)]
197pub struct TableName<'a> {
198    name: &'a str,
199}
200
201impl<'a> TableName<'a> {
202    /// Construct a validated table name.
203    pub fn new(name: &'a str) -> crate::Result<Self> {
204        if name.is_empty() {
205            return Err(error::fmt!(
206                InvalidName,
207                "Table names must have a non-zero length."
208            ));
209        }
210
211        let mut prev = '\0';
212        for (index, c) in name.chars().enumerate() {
213            match c {
214                '.' => {
215                    if index == 0 || index == name.len() - 1 || prev == '.' {
216                        return Err(error::fmt!(
217                            InvalidName,
218                            concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
219                            name,
220                            index
221                        ));
222                    }
223                }
224                '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
225                | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
226                | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
227                | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
228                    return Err(error::fmt!(
229                        InvalidName,
230                        concat!(
231                            "Bad string {:?}: ",
232                            "Table names can't contain ",
233                            "a {:?} character, which was found at ",
234                            "byte position {}."
235                        ),
236                        name,
237                        c,
238                        index
239                    ));
240                }
241                '\u{feff}' => {
242                    // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
243                    // aka UTF-8 BOM if it appears anywhere in the string.
244                    return Err(error::fmt!(
245                        InvalidName,
246                        concat!(
247                            "Bad string {:?}: ",
248                            "Table names can't contain ",
249                            "a UTF-8 BOM character, which was found at ",
250                            "byte position {}."
251                        ),
252                        name,
253                        index
254                    ));
255                }
256                _ => (),
257            }
258            prev = c;
259        }
260
261        Ok(Self { name })
262    }
263
264    /// Construct a table name without validating it.
265    ///
266    /// This breaks API encapsulation and is only intended for use
267    /// when the string was already previously validated.
268    ///
269    /// The QuestDB server will reject an invalid table name.
270    pub fn new_unchecked(name: &'a str) -> Self {
271        Self { name }
272    }
273}
274
275impl<'a> TryFrom<&'a str> for TableName<'a> {
276    type Error = Error;
277
278    fn try_from(name: &'a str) -> crate::Result<Self> {
279        Self::new(name)
280    }
281}
282
283impl AsRef<str> for TableName<'_> {
284    fn as_ref(&self) -> &str {
285        self.name
286    }
287}
288
289/// A validated column name.
290///
291/// This type simply wraps a `&str`.
292///
293/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method,
294/// it doesn't have to validate it again. This saves CPU cycles.
295#[derive(Clone, Copy)]
296pub struct ColumnName<'a> {
297    name: &'a str,
298}
299
300impl<'a> ColumnName<'a> {
301    /// Construct a validated table name.
302    pub fn new(name: &'a str) -> crate::Result<Self> {
303        if name.is_empty() {
304            return Err(error::fmt!(
305                InvalidName,
306                "Column names must have a non-zero length."
307            ));
308        }
309
310        for (index, c) in name.chars().enumerate() {
311            match c {
312                '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
313                | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
314                | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
315                | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
316                    return Err(error::fmt!(
317                        InvalidName,
318                        concat!(
319                            "Bad string {:?}: ",
320                            "Column names can't contain ",
321                            "a {:?} character, which was found at ",
322                            "byte position {}."
323                        ),
324                        name,
325                        c,
326                        index
327                    ));
328                }
329                '\u{FEFF}' => {
330                    // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
331                    // aka UTF-8 BOM if it appears anywhere in the string.
332                    return Err(error::fmt!(
333                        InvalidName,
334                        concat!(
335                            "Bad string {:?}: ",
336                            "Column names can't contain ",
337                            "a UTF-8 BOM character, which was found at ",
338                            "byte position {}."
339                        ),
340                        name,
341                        index
342                    ));
343                }
344                _ => (),
345            }
346        }
347
348        Ok(Self { name })
349    }
350
351    /// Construct a column name without validating it.
352    ///
353    /// This breaks API encapsulation and is only intended for use
354    /// when the string was already previously validated.
355    ///
356    /// The QuestDB server will reject an invalid column name.
357    pub fn new_unchecked(name: &'a str) -> Self {
358        Self { name }
359    }
360}
361
362impl<'a> TryFrom<&'a str> for ColumnName<'a> {
363    type Error = Error;
364
365    fn try_from(name: &'a str) -> crate::Result<Self> {
366        Self::new(name)
367    }
368}
369
370impl AsRef<str> for ColumnName<'_> {
371    fn as_ref(&self) -> &str {
372        self.name
373    }
374}
375
376/// A reusable buffer to prepare a batch of ILP messages.
377///
378/// # Example
379///
380/// ```no_run
381/// # use questdb::Result;
382/// # use questdb::ingress::SenderBuilder;
383///
384/// # fn main() -> Result<()> {
385/// # let mut sender = SenderBuilder::from_conf("http::addr=localhost:9000;")?.build()?;
386/// # use questdb::Result;
387/// use questdb::ingress::{Buffer, TimestampMicros, TimestampNanos};
388/// let mut buffer = sender.new_buffer();
389///
390/// // first row
391/// buffer
392///     .table("table1")?
393///     .symbol("bar", "baz")?
394///     .column_bool("a", false)?
395///     .column_i64("b", 42)?
396///     .column_f64("c", 3.14)?
397///     .column_str("d", "hello")?
398///     .column_ts("e", TimestampMicros::now())?
399///     .at(TimestampNanos::now())?;
400///
401/// // second row
402/// buffer
403///     .table("table2")?
404///     .symbol("foo", "bar")?
405///     .at(TimestampNanos::now())?;
406/// # Ok(())
407/// # }
408/// ```
409///
410/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush).
411///
412/// # Sequential Coupling
413/// The Buffer API is sequentially coupled:
414///   * A row always starts with [`table`](Buffer::table).
415///   * A row must contain at least one [`symbol`](Buffer::symbol) or
416///     column (
417///     [`column_bool`](Buffer::column_bool),
418///     [`column_i64`](Buffer::column_i64),
419///     [`column_f64`](Buffer::column_f64),
420///     [`column_str`](Buffer::column_str),
421///     [`column_arr`](Buffer::column_arr),
422///     [`column_ts`](Buffer::column_ts)).
423///   * Symbols must appear before columns.
424///   * A row must be terminated with either [`at`](Buffer::at) or
425///     [`at_now`](Buffer::at_now).
426///
427/// This diagram visualizes the sequence:
428///
429/// <img src="https://raw.githubusercontent.com/questdb/c-questdb-client/main/api_seq/seq.svg">
430///
431/// # Buffer method calls, Serialized ILP types and QuestDB types
432///
433/// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) |
434/// |---------------|--------------------------------------------------------------|
435/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) |
436/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) |
437/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) |
438/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) |
439/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) |
440/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.io/docs/reference/api/ilp/columnset-types#array) |
441/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) |
442///
443/// QuestDB supports both `STRING` and `SYMBOL` column types.
444///
445/// To understand the difference, refer to the
446/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell,
447/// symbols are interned strings, most suitable for identifiers that are repeated many
448/// times throughout the column. They offer an advantage in storage space and query
449/// performance.
450///
451/// # Inserting NULL values
452///
453/// To insert a NULL value, skip the symbol or column for that row.
454///
455/// # Recovering from validation errors
456///
457/// If you want to recover from potential validation errors, call
458/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state,
459/// append as many rows or parts of rows as you like, and then call
460/// [`buffer.clear_marker()`](Buffer::clear_marker) on success.
461///
462/// If there was an error in one of the rows, use
463/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the
464/// marked last known good state.
465///
466#[derive(Clone)]
467pub struct Buffer {
468    output: Vec<u8>,
469    state: BufferState,
470    marker: Option<(usize, BufferState)>,
471    max_name_len: usize,
472    protocol_version: ProtocolVersion,
473}
474
475impl Buffer {
476    /// Creates a new [`Buffer`] with default parameters.
477    ///
478    /// - Uses the specified protocol version
479    /// - Sets maximum name length to **127 characters** (QuestDB server default)
480    ///
481    /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
482    /// and [`Sender::max_name_len`] is 127.
483    ///
484    /// For custom name lengths, use [`Self::with_max_name_len`]
485    pub fn new(protocol_version: ProtocolVersion) -> Self {
486        Self::with_max_name_len(protocol_version, MAX_NAME_LEN_DEFAULT)
487    }
488
489    /// Creates a new [`Buffer`] with a custom maximum name length.
490    ///
491    /// - `max_name_len`: Maximum allowed length for table/column names, match
492    ///   your QuestDB server's `cairo.max.file.name.length` configuration
493    /// - `protocol_version`: Protocol version to use
494    ///
495    /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
496    /// and [`Sender::max_name_len`].
497    ///
498    /// For the default max name length limit (127), use [`Self::new`].
499    pub fn with_max_name_len(protocol_version: ProtocolVersion, max_name_len: usize) -> Self {
500        Self {
501            output: Vec::new(),
502            state: BufferState::new(),
503            marker: None,
504            max_name_len,
505            protocol_version,
506        }
507    }
508
509    pub fn protocol_version(&self) -> ProtocolVersion {
510        self.protocol_version
511    }
512
513    /// Pre-allocate to ensure the buffer has enough capacity for at least the
514    /// specified additional byte count. This may be rounded up.
515    /// This does not allocate if such additional capacity is already satisfied.
516    /// See: `capacity`.
517    pub fn reserve(&mut self, additional: usize) {
518        self.output.reserve(additional);
519    }
520
521    /// The number of bytes accumulated in the buffer.
522    pub fn len(&self) -> usize {
523        self.output.len()
524    }
525
526    /// The number of rows accumulated in the buffer.
527    pub fn row_count(&self) -> usize {
528        self.state.row_count
529    }
530
531    /// Tells whether the buffer is transactional. It is transactional iff it contains
532    /// data for at most one table. Additionally, you must send the buffer over HTTP to
533    /// get transactional behavior.
534    pub fn transactional(&self) -> bool {
535        self.state.transactional
536    }
537
538    pub fn is_empty(&self) -> bool {
539        self.output.is_empty()
540    }
541
542    /// The total number of bytes the buffer can hold before it needs to resize.
543    pub fn capacity(&self) -> usize {
544        self.output.capacity()
545    }
546
547    pub fn as_bytes(&self) -> &[u8] {
548        &self.output
549    }
550
551    /// Mark a rewind point.
552    /// This allows undoing accumulated changes to the buffer for one or more
553    /// rows by calling [`rewind_to_marker`](Buffer::rewind_to_marker).
554    /// Any previous marker will be discarded.
555    /// Once the marker is no longer needed, call
556    /// [`clear_marker`](Buffer::clear_marker).
557    pub fn set_marker(&mut self) -> crate::Result<()> {
558        if (self.state.op_case as isize & Op::Table as isize) == 0 {
559            return Err(error::fmt!(
560                InvalidApiCall,
561                concat!(
562                    "Can't set the marker whilst constructing a line. ",
563                    "A marker may only be set on an empty buffer or after ",
564                    "`at` or `at_now` is called."
565                )
566            ));
567        }
568        self.marker = Some((self.output.len(), self.state));
569        Ok(())
570    }
571
572    /// Undo all changes since the last [`set_marker`](Buffer::set_marker)
573    /// call.
574    ///
575    /// As a side effect, this also clears the marker.
576    pub fn rewind_to_marker(&mut self) -> crate::Result<()> {
577        if let Some((position, state)) = self.marker.take() {
578            self.output.truncate(position);
579            self.state = state;
580            Ok(())
581        } else {
582            Err(error::fmt!(
583                InvalidApiCall,
584                "Can't rewind to the marker: No marker set."
585            ))
586        }
587    }
588
589    /// Discard any marker as may have been set by
590    /// [`set_marker`](Buffer::set_marker).
591    ///
592    /// Idempotent.
593    pub fn clear_marker(&mut self) {
594        self.marker = None;
595    }
596
597    /// Reset the buffer and clear contents whilst retaining
598    /// [`capacity`](Buffer::capacity).
599    pub fn clear(&mut self) {
600        self.output.clear();
601        self.state = BufferState::new();
602        self.marker = None;
603    }
604
605    /// Check if the next API operation is allowed as per the OP case state machine.
606    #[inline(always)]
607    fn check_op(&self, op: Op) -> crate::Result<()> {
608        if (self.state.op_case as isize & op as isize) > 0 {
609            Ok(())
610        } else {
611            Err(error::fmt!(
612                InvalidApiCall,
613                "State error: Bad call to `{}`, {}.",
614                op.descr(),
615                self.state.op_case.next_op_descr()
616            ))
617        }
618    }
619
620    /// Checks if this buffer is ready to be flushed to a sender via one of the
621    /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer
622    /// is ready to be flushed via a [`Sender`] while an [`Err`] will contain a
623    /// message indicating why this [`Buffer`] cannot be flushed at the moment.
624    #[inline(always)]
625    pub fn check_can_flush(&self) -> crate::Result<()> {
626        self.check_op(Op::Flush)
627    }
628
629    #[inline(always)]
630    fn validate_max_name_len(&self, name: &str) -> crate::Result<()> {
631        if name.len() > self.max_name_len {
632            return Err(error::fmt!(
633                InvalidName,
634                "Bad name: {:?}: Too long (max {} characters)",
635                name,
636                self.max_name_len
637            ));
638        }
639        Ok(())
640    }
641
642    /// Begin recording a new row for the given table.
643    ///
644    /// ```no_run
645    /// # use questdb::Result;
646    /// # use questdb::ingress::{Buffer, SenderBuilder};
647    /// # fn main() -> Result<()> {
648    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
649    /// # let mut buffer = sender.new_buffer();
650    /// buffer.table("table_name")?;
651    /// # Ok(())
652    /// # }
653    /// ```
654    ///
655    /// or
656    ///
657    /// ```no_run
658    /// # use questdb::Result;
659    /// # use questdb::ingress::{Buffer, SenderBuilder};
660    /// use questdb::ingress::TableName;
661    ///
662    /// # fn main() -> Result<()> {
663    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
664    /// # let mut buffer = sender.new_buffer();
665    /// let table_name = TableName::new("table_name")?;
666    /// buffer.table(table_name)?;
667    /// # Ok(())
668    /// # }
669    /// ```
670    pub fn table<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
671    where
672        N: TryInto<TableName<'a>>,
673        Error: From<N::Error>,
674    {
675        let name: TableName<'a> = name.try_into()?;
676        self.validate_max_name_len(name.name)?;
677        self.check_op(Op::Table)?;
678        let table_begin = self.output.len();
679        write_escaped_unquoted(&mut self.output, name.name);
680        let table_end = self.output.len();
681        self.state.op_case = OpCase::TableWritten;
682
683        // A buffer stops being transactional if it targets multiple tables.
684        if let Some(first_table_len) = &self.state.first_table_len {
685            let first_table = &self.output[0..first_table_len.get()];
686            let this_table = &self.output[table_begin..table_end];
687            if first_table != this_table {
688                self.state.transactional = false;
689            }
690        } else {
691            debug_assert!(table_begin == 0);
692
693            // This is a bit confusing, so worth explaining:
694            // `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
695            // but we know that `table_end` is never 0 here, we just need an option type
696            // anyway, so we don't bother unwrapping it to then wrap it again.
697            let first_table_len = NonZeroUsize::new(table_end);
698
699            // Instead we just assert that it's `Some`.
700            debug_assert!(first_table_len.is_some());
701
702            self.state.first_table_len = first_table_len;
703        }
704        Ok(self)
705    }
706
707    /// Record a symbol for the given column.
708    /// Make sure you record all symbol columns before any other column type.
709    ///
710    /// ```no_run
711    /// # use questdb::Result;
712    /// # use questdb::ingress::{Buffer, SenderBuilder};
713    /// # fn main() -> Result<()> {
714    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
715    /// # let mut buffer = sender.new_buffer();
716    /// # buffer.table("x")?;
717    /// buffer.symbol("col_name", "value")?;
718    /// # Ok(())
719    /// # }
720    /// ```
721    ///
722    /// or
723    ///
724    /// ```no_run
725    /// # use questdb::Result;
726    /// # use questdb::ingress::{Buffer, SenderBuilder};
727    /// # fn main() -> Result<()> {
728    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
729    /// # let mut buffer = sender.new_buffer();
730    /// # buffer.table("x")?;
731    /// let value: String = "value".to_owned();
732    /// buffer.symbol("col_name", value)?;
733    /// # Ok(())
734    /// # }
735    /// ```
736    ///
737    /// or
738    ///
739    /// ```no_run
740    /// # use questdb::Result;
741    /// # use questdb::ingress::{Buffer, SenderBuilder};
742    /// use questdb::ingress::ColumnName;
743    ///
744    /// # fn main() -> Result<()> {
745    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
746    /// # let mut buffer = sender.new_buffer();
747    /// # buffer.table("x")?;
748    /// let col_name = ColumnName::new("col_name")?;
749    /// buffer.symbol(col_name, "value")?;
750    /// # Ok(())
751    /// # }
752    /// ```
753    ///
754    pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
755    where
756        N: TryInto<ColumnName<'a>>,
757        S: AsRef<str>,
758        Error: From<N::Error>,
759    {
760        let name: ColumnName<'a> = name.try_into()?;
761        self.validate_max_name_len(name.name)?;
762        self.check_op(Op::Symbol)?;
763        self.output.push(b',');
764        write_escaped_unquoted(&mut self.output, name.name);
765        self.output.push(b'=');
766        write_escaped_unquoted(&mut self.output, value.as_ref());
767        self.state.op_case = OpCase::SymbolWritten;
768        Ok(self)
769    }
770
771    fn write_column_key<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
772    where
773        N: TryInto<ColumnName<'a>>,
774        Error: From<N::Error>,
775    {
776        let name: ColumnName<'a> = name.try_into()?;
777        self.validate_max_name_len(name.name)?;
778        self.check_op(Op::Column)?;
779        self.output
780            .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
781                b' '
782            } else {
783                b','
784            });
785        write_escaped_unquoted(&mut self.output, name.name);
786        self.output.push(b'=');
787        self.state.op_case = OpCase::ColumnWritten;
788        Ok(self)
789    }
790
791    /// Record a boolean value for the given column.
792    ///
793    /// ```no_run
794    /// # use questdb::Result;
795    /// # use questdb::ingress::{Buffer, SenderBuilder};
796    /// # fn main() -> Result<()> {
797    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
798    /// # let mut buffer = sender.new_buffer();
799    /// # buffer.table("x")?;
800    /// buffer.column_bool("col_name", true)?;
801    /// # Ok(())
802    /// # }
803    /// ```
804    ///
805    /// or
806    ///
807    /// ```no_run
808    /// # use questdb::Result;
809    /// # use questdb::ingress::{Buffer, SenderBuilder};
810    /// use questdb::ingress::ColumnName;
811    ///
812    /// # fn main() -> Result<()> {
813    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
814    /// # let mut buffer = sender.new_buffer();
815    /// # buffer.table("x")?;
816    /// let col_name = ColumnName::new("col_name")?;
817    /// buffer.column_bool(col_name, true)?;
818    /// # Ok(())
819    /// # }
820    /// ```
821    pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> crate::Result<&mut Self>
822    where
823        N: TryInto<ColumnName<'a>>,
824        Error: From<N::Error>,
825    {
826        self.write_column_key(name)?;
827        self.output.push(if value { b't' } else { b'f' });
828        Ok(self)
829    }
830
831    /// Record an integer value for the given column.
832    ///
833    /// ```no_run
834    /// # use questdb::Result;
835    /// # use questdb::ingress::{Buffer, SenderBuilder};
836    /// # fn main() -> Result<()> {
837    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
838    /// # let mut buffer = sender.new_buffer();
839    /// # buffer.table("x")?;
840    /// buffer.column_i64("col_name", 42)?;
841    /// # Ok(())
842    /// # }
843    /// ```
844    ///
845    /// or
846    ///
847    /// ```no_run
848    /// # use questdb::Result;
849    /// # use questdb::ingress::{Buffer, SenderBuilder};
850    /// use questdb::ingress::ColumnName;
851    ///
852    /// # fn main() -> Result<()> {
853    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
854    /// # let mut buffer = sender.new_buffer();
855    /// # buffer.table("x")?;
856    /// let col_name = ColumnName::new("col_name")?;
857    /// buffer.column_i64(col_name, 42)?;
858    /// # Ok(())
859    /// # }
860    /// ```
861    pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> crate::Result<&mut Self>
862    where
863        N: TryInto<ColumnName<'a>>,
864        Error: From<N::Error>,
865    {
866        self.write_column_key(name)?;
867        let mut buf = itoa::Buffer::new();
868        let printed = buf.format(value);
869        self.output.extend_from_slice(printed.as_bytes());
870        self.output.push(b'i');
871        Ok(self)
872    }
873
874    /// Record a floating point value for the given column.
875    ///
876    /// ```no_run
877    /// # use questdb::Result;
878    /// # use questdb::ingress::{Buffer, SenderBuilder};
879    /// # fn main() -> Result<()> {
880    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
881    /// # let mut buffer = sender.new_buffer();
882    /// # buffer.table("x")?;
883    /// buffer.column_f64("col_name", 3.14)?;
884    /// # Ok(())
885    /// # }
886    /// ```
887    ///
888    /// or
889    ///
890    /// ```no_run
891    /// # use questdb::Result;
892    /// # use questdb::ingress::{Buffer, SenderBuilder};
893    /// use questdb::ingress::ColumnName;
894    ///
895    /// # fn main() -> Result<()> {
896    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
897    /// # let mut buffer = sender.new_buffer();
898    /// # buffer.table("x")?;
899    /// let col_name = ColumnName::new("col_name")?;
900    /// buffer.column_f64(col_name, 3.14)?;
901    /// # Ok(())
902    /// # }
903    /// ```
904    pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> crate::Result<&mut Self>
905    where
906        N: TryInto<ColumnName<'a>>,
907        Error: From<N::Error>,
908    {
909        self.write_column_key(name)?;
910        if !matches!(self.protocol_version, ProtocolVersion::V1) {
911            self.output.push(b'=');
912            self.output.push(DOUBLE_BINARY_FORMAT_TYPE);
913            self.output.extend_from_slice(&value.to_le_bytes())
914        } else {
915            let mut ser = F64Serializer::new(value);
916            self.output.extend_from_slice(ser.as_str().as_bytes())
917        }
918        Ok(self)
919    }
920
921    /// Record a string value for the given column.
922    ///
923    /// ```no_run
924    /// # use questdb::Result;
925    /// # use questdb::ingress::{Buffer, SenderBuilder};
926    /// # fn main() -> Result<()> {
927    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
928    /// # let mut buffer = sender.new_buffer();
929    /// # buffer.table("x")?;
930    /// buffer.column_str("col_name", "value")?;
931    /// # Ok(())
932    /// # }
933    /// ```
934    ///
935    /// or
936    ///
937    /// ```no_run
938    /// # use questdb::Result;
939    /// # use questdb::ingress::{Buffer, SenderBuilder};
940    /// # fn main() -> Result<()> {
941    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
942    /// # let mut buffer = sender.new_buffer();
943    /// # buffer.table("x")?;
944    /// let value: String = "value".to_owned();
945    /// buffer.column_str("col_name", value)?;
946    /// # Ok(())
947    /// # }
948    /// ```
949    ///
950    /// or
951    ///
952    /// ```no_run
953    /// # use questdb::Result;
954    /// # use questdb::ingress::{Buffer, SenderBuilder};
955    /// use questdb::ingress::ColumnName;
956    ///
957    /// # fn main() -> Result<()> {
958    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
959    /// # let mut buffer = sender.new_buffer();
960    /// # buffer.table("x")?;
961    /// let col_name = ColumnName::new("col_name")?;
962    /// buffer.column_str(col_name, "value")?;
963    /// # Ok(())
964    /// # }
965    /// ```
966    pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
967    where
968        N: TryInto<ColumnName<'a>>,
969        S: AsRef<str>,
970        Error: From<N::Error>,
971    {
972        self.write_column_key(name)?;
973        write_escaped_quoted(&mut self.output, value.as_ref());
974        Ok(self)
975    }
976
977    /// Record a multidimensional array value for the given column.
978    ///
979    /// Supports arrays with up to [`MAX_ARRAY_DIMS`] dimensions. The array elements must
980    /// be of type `f64`, which is currently the only supported data type.
981    ///
982    /// **Note**: QuestDB server version 9.0.0 or later is required for array support.
983    ///
984    /// # Examples
985    ///
986    /// Recording a 2D array using slices:
987    ///
988    /// ```no_run
989    /// # use questdb::Result;
990    /// # use questdb::ingress::{Buffer, SenderBuilder};
991    /// # fn main() -> Result<()> {
992    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
993    /// # let mut buffer = sender.new_buffer();
994    /// # buffer.table("x")?;
995    /// let array_2d = vec![vec![1.1, 2.2], vec![3.3, 4.4]];
996    /// buffer.column_arr("array_col", &array_2d)?;
997    /// # Ok(())
998    /// # }
999    /// ```
1000    ///
1001    /// Recording a 3D array using vectors:
1002    ///
1003    /// ```no_run
1004    /// # use questdb::Result;
1005    /// # use questdb::ingress::{Buffer, ColumnName, SenderBuilder};
1006    /// # fn main() -> Result<()> {
1007    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1008    /// # let mut buffer = sender.new_buffer();
1009    /// # buffer.table("x1")?;
1010    /// let array_3d = vec![vec![vec![42.0; 4]; 3]; 2];
1011    /// let col_name = ColumnName::new("col1")?;
1012    /// buffer.column_arr(col_name, &array_3d)?;
1013    /// # Ok(())
1014    /// # }
1015    /// ```
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns [`Error`] if:
1020    /// - Array dimensions exceed [`MAX_ARRAY_DIMS`]
1021    /// - Failed to get dimension sizes
1022    /// - Column name validation fails
1023    /// - Protocol version v1 is used (arrays require v2+)
1024    #[allow(private_bounds)]
1025    pub fn column_arr<'a, N, T, D>(&mut self, name: N, view: &T) -> crate::Result<&mut Self>
1026    where
1027        N: TryInto<ColumnName<'a>>,
1028        T: NdArrayView<D>,
1029        D: ArrayElement + ArrayElementSealed,
1030        Error: From<N::Error>,
1031    {
1032        if self.protocol_version == ProtocolVersion::V1 {
1033            return Err(error::fmt!(
1034                ProtocolVersionError,
1035                "Protocol version v1 does not support array datatype",
1036            ));
1037        }
1038        let ndim = view.ndim();
1039        if ndim == 0 {
1040            return Err(error::fmt!(
1041                ArrayError,
1042                "Zero-dimensional arrays are not supported",
1043            ));
1044        }
1045
1046        // check dimension less equal than max dims
1047        if MAX_ARRAY_DIMS < ndim {
1048            return Err(error::fmt!(
1049                ArrayError,
1050                "Array dimension mismatch: expected at most {} dimensions, but got {}",
1051                MAX_ARRAY_DIMS,
1052                ndim
1053            ));
1054        }
1055
1056        let array_buf_size = check_and_get_array_bytes_size(view)?;
1057        self.write_column_key(name)?;
1058        // binary format flag '='
1059        self.output.push(b'=');
1060        // binary format entity type
1061        self.output.push(ARRAY_BINARY_FORMAT_TYPE);
1062        // ndarr datatype
1063        self.output.push(D::type_tag());
1064        // ndarr dims
1065        self.output.push(ndim as u8);
1066
1067        let dim_header_size = size_of::<u32>() * ndim;
1068        self.output.reserve(dim_header_size + array_buf_size);
1069
1070        for i in 0..ndim {
1071            // ndarr shape
1072            self.output
1073                .extend_from_slice((view.dim(i)? as u32).to_le_bytes().as_slice());
1074        }
1075
1076        let index = self.output.len();
1077        let writeable =
1078            unsafe { from_raw_parts_mut(self.output.as_mut_ptr().add(index), array_buf_size) };
1079
1080        // ndarr data
1081        ndarr::write_array_data(view, writeable, array_buf_size)?;
1082        unsafe { self.output.set_len(array_buf_size + index) }
1083        Ok(self)
1084    }
1085
1086    /// Record a timestamp value for the given column.
1087    ///
1088    /// ```no_run
1089    /// # use questdb::Result;
1090    /// # use questdb::ingress::{Buffer, SenderBuilder};
1091    /// use questdb::ingress::TimestampMicros;
1092    /// # fn main() -> Result<()> {
1093    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1094    /// # let mut buffer = sender.new_buffer();
1095    /// # buffer.table("x")?;
1096    /// buffer.column_ts("col_name", TimestampMicros::now())?;
1097    /// # Ok(())
1098    /// # }
1099    /// ```
1100    ///
1101    /// or
1102    ///
1103    /// ```no_run
1104    /// # use questdb::Result;
1105    /// # use questdb::ingress::{Buffer, SenderBuilder};
1106    /// use questdb::ingress::TimestampMicros;
1107    ///
1108    /// # fn main() -> Result<()> {
1109    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1110    /// # let mut buffer = sender.new_buffer();
1111    /// # buffer.table("x")?;
1112    /// buffer.column_ts("col_name", TimestampMicros::new(1659548204354448))?;
1113    /// # Ok(())
1114    /// # }
1115    /// ```
1116    ///
1117    /// or
1118    ///
1119    /// ```no_run
1120    /// # use questdb::Result;
1121    /// # use questdb::ingress::{Buffer, SenderBuilder};
1122    /// use questdb::ingress::TimestampMicros;
1123    /// use questdb::ingress::ColumnName;
1124    ///
1125    /// # fn main() -> Result<()> {
1126    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1127    /// # let mut buffer = sender.new_buffer();
1128    /// # buffer.table("x")?;
1129    /// let col_name = ColumnName::new("col_name")?;
1130    /// buffer.column_ts(col_name, TimestampMicros::now())?;
1131    /// # Ok(())
1132    /// # }
1133    /// ```
1134    ///
1135    /// or you can also pass in a `TimestampNanos`.
1136    ///
1137    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1138    /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1139    ///
1140    /// This last option requires the `chrono_timestamp` feature.
1141    pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> crate::Result<&mut Self>
1142    where
1143        N: TryInto<ColumnName<'a>>,
1144        T: TryInto<Timestamp>,
1145        Error: From<N::Error>,
1146        Error: From<T::Error>,
1147    {
1148        self.write_column_key(name)?;
1149        let timestamp: Timestamp = value.try_into()?;
1150        let mut buf = itoa::Buffer::new();
1151        match timestamp {
1152            Timestamp::Micros(ts) => {
1153                let printed = buf.format(ts.as_i64());
1154                self.output.extend_from_slice(printed.as_bytes());
1155                self.output.push(b't');
1156            }
1157            Timestamp::Nanos(ts) => {
1158                let printed = buf.format(ts.as_i64());
1159                self.output.extend_from_slice(printed.as_bytes());
1160                self.output.push(b'n');
1161            }
1162        }
1163        Ok(self)
1164    }
1165
1166    /// Complete the current row with the designated timestamp. After this call, you can
1167    /// start recording the next row by calling [Buffer::table] again, or  you can send
1168    /// the accumulated batch by calling [Sender::flush] or one of its variants.
1169    ///
1170    /// ```no_run
1171    /// # use questdb::Result;
1172    /// # use questdb::ingress::{Buffer, SenderBuilder};
1173    /// use questdb::ingress::TimestampNanos;
1174    /// # fn main() -> Result<()> {
1175    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1176    /// # let mut buffer = sender.new_buffer();
1177    /// # buffer.table("x")?.symbol("a", "b")?;
1178    /// buffer.at(TimestampNanos::now())?;
1179    /// # Ok(())
1180    /// # }
1181    /// ```
1182    ///
1183    /// or
1184    ///
1185    /// ```no_run
1186    /// # use questdb::Result;
1187    /// # use questdb::ingress::{Buffer, SenderBuilder};
1188    /// use questdb::ingress::TimestampNanos;
1189    ///
1190    /// # fn main() -> Result<()> {
1191    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1192    /// # let mut buffer = sender.new_buffer();
1193    /// # buffer.table("x")?.symbol("a", "b")?;
1194    /// buffer.at(TimestampNanos::new(1659548315647406592))?;
1195    /// # Ok(())
1196    /// # }
1197    /// ```
1198    ///
1199    /// You can also pass in a `TimestampMicros`.
1200    ///
1201    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1202    /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1203    ///
1204    pub fn at<T>(&mut self, timestamp: T) -> crate::Result<()>
1205    where
1206        T: TryInto<Timestamp>,
1207        Error: From<T::Error>,
1208    {
1209        self.check_op(Op::At)?;
1210        let timestamp: Timestamp = timestamp.try_into()?;
1211
1212        // https://github.com/rust-lang/rust/issues/115880
1213        let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
1214        let timestamp: TimestampNanos = timestamp?;
1215
1216        let epoch_nanos = timestamp.as_i64();
1217        if epoch_nanos < 0 {
1218            return Err(error::fmt!(
1219                InvalidTimestamp,
1220                "Timestamp {} is negative. It must be >= 0.",
1221                epoch_nanos
1222            ));
1223        }
1224        let mut buf = itoa::Buffer::new();
1225        let printed = buf.format(epoch_nanos);
1226        self.output.push(b' ');
1227        self.output.extend_from_slice(printed.as_bytes());
1228        self.output.push(b'\n');
1229        self.state.op_case = OpCase::MayFlushOrTable;
1230        self.state.row_count += 1;
1231        Ok(())
1232    }
1233
1234    /// Complete the current row without providing a timestamp. The QuestDB instance
1235    /// will insert its own timestamp.
1236    ///
1237    /// Letting the server assign the timestamp can be faster since it reliably avoids
1238    /// out-of-order operations in the database for maximum ingestion throughput. However,
1239    /// it removes the ability to deduplicate rows.
1240    ///
1241    /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB
1242    /// server will set the timestamp only after receiving the row. If you're flushing
1243    /// infrequently, the server-assigned timestamp may be significantly behind the
1244    /// time the data was recorded in the buffer.
1245    ///
1246    /// In almost all cases, you should prefer the [Buffer::at] function.
1247    ///
1248    /// After this call, you can start recording the next row by calling [Buffer::table]
1249    /// again, or you can send the accumulated batch by calling [Sender::flush] or one of
1250    /// its variants.
1251    ///
1252    /// ```no_run
1253    /// # use questdb::Result;
1254    /// # use questdb::ingress::{Buffer, SenderBuilder};
1255    /// # fn main() -> Result<()> {
1256    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1257    /// # let mut buffer = sender.new_buffer();
1258    /// # buffer.table("x")?.symbol("a", "b")?;
1259    /// buffer.at_now()?;
1260    /// # Ok(())
1261    /// # }
1262    /// ```
1263    pub fn at_now(&mut self) -> crate::Result<()> {
1264        self.check_op(Op::At)?;
1265        self.output.push(b'\n');
1266        self.state.op_case = OpCase::MayFlushOrTable;
1267        self.state.row_count += 1;
1268        Ok(())
1269    }
1270}
1271
1272impl Debug for Buffer {
1273    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1274        f.debug_struct("Buffer")
1275            .field("output", &DebugBytes(&self.output))
1276            .field("state", &self.state)
1277            .field("marker", &self.marker)
1278            .field("max_name_len", &self.max_name_len)
1279            .field("protocol_version", &self.protocol_version)
1280            .finish()
1281    }
1282}