questdb/ingress/
buffer.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2025 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed};
25use crate::ingress::{
26    ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampMicros,
27    TimestampNanos, ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS,
28    MAX_NAME_LEN_DEFAULT,
29};
30use crate::{error, Error};
31use std::fmt::{Debug, Formatter};
32use std::num::NonZeroUsize;
33use std::slice::from_raw_parts_mut;
34
35fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut Vec<u8>, s: &str)
36where
37    C: Fn(u8) -> bool,
38    Q: Fn(&mut Vec<u8>),
39{
40    let mut to_escape = 0usize;
41    for b in s.bytes() {
42        if check_escape_fn(b) {
43            to_escape += 1;
44        }
45    }
46
47    quoting_fn(output);
48
49    if to_escape == 0 {
50        // output.push_str(s);
51        output.extend_from_slice(s.as_bytes());
52    } else {
53        let additional = s.len() + to_escape;
54        output.reserve(additional);
55        let mut index = output.len();
56        unsafe { output.set_len(index + additional) };
57        for b in s.bytes() {
58            if check_escape_fn(b) {
59                unsafe {
60                    *output.get_unchecked_mut(index) = b'\\';
61                }
62                index += 1;
63            }
64
65            unsafe {
66                *output.get_unchecked_mut(index) = b;
67            }
68            index += 1;
69        }
70    }
71
72    quoting_fn(output);
73}
74
75fn must_escape_unquoted(c: u8) -> bool {
76    matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
77}
78
79fn must_escape_quoted(c: u8) -> bool {
80    matches!(c, b'\n' | b'\r' | b'"' | b'\\')
81}
82
83fn write_escaped_unquoted(output: &mut Vec<u8>, s: &str) {
84    write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
85}
86
87fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
88    write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
89}
90
91pub(crate) struct F64Serializer {
92    buf: ryu::Buffer,
93    n: f64,
94}
95
96impl F64Serializer {
97    pub(crate) fn new(n: f64) -> Self {
98        F64Serializer {
99            buf: ryu::Buffer::new(),
100            n,
101        }
102    }
103
104    // This function was taken and customized from the ryu crate.
105    #[cold]
106    fn format_nonfinite(&self) -> &'static str {
107        const MANTISSA_MASK: u64 = 0x000fffffffffffff;
108        const SIGN_MASK: u64 = 0x8000000000000000;
109        let bits = self.n.to_bits();
110        if bits & MANTISSA_MASK != 0 {
111            "NaN"
112        } else if bits & SIGN_MASK != 0 {
113            "-Infinity"
114        } else {
115            "Infinity"
116        }
117    }
118
119    pub(crate) fn as_str(&mut self) -> &str {
120        if self.n.is_finite() {
121            self.buf.format_finite(self.n)
122        } else {
123            self.format_nonfinite()
124        }
125    }
126}
127
128#[derive(Debug, Copy, Clone)]
129enum Op {
130    Table = 1,
131    Symbol = 1 << 1,
132    Column = 1 << 2,
133    At = 1 << 3,
134    Flush = 1 << 4,
135}
136
137impl Op {
138    fn descr(self) -> &'static str {
139        match self {
140            Op::Table => "table",
141            Op::Symbol => "symbol",
142            Op::Column => "column",
143            Op::At => "at",
144            Op::Flush => "flush",
145        }
146    }
147}
148
149#[derive(Debug, Copy, Clone, PartialEq)]
150enum OpCase {
151    Init = Op::Table as isize,
152    TableWritten = Op::Symbol as isize | Op::Column as isize,
153    SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
154    ColumnWritten = Op::Column as isize | Op::At as isize,
155    MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
156}
157
158impl OpCase {
159    fn next_op_descr(self) -> &'static str {
160        match self {
161            OpCase::Init => "should have called `table` instead",
162            OpCase::TableWritten => "should have called `symbol` or `column` instead",
163            OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
164            OpCase::ColumnWritten => "should have called `column` or `at` instead",
165            OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
166        }
167    }
168}
169
170// IMPORTANT: This struct MUST remain `Copy` to ensure that
171// there are no heap allocations when performing marker operations.
172#[derive(Debug, Clone, Copy)]
173struct BufferState {
174    op_case: OpCase,
175    row_count: usize,
176    first_table_len: Option<NonZeroUsize>,
177    transactional: bool,
178}
179
180impl BufferState {
181    fn new() -> Self {
182        Self {
183            op_case: OpCase::Init,
184            row_count: 0,
185            first_table_len: None,
186            transactional: true,
187        }
188    }
189}
190
191/// A validated table name.
192///
193/// This type simply wraps a `&str`.
194///
195/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method,
196/// it doesn't have to validate it again. This saves CPU cycles.
197#[derive(Clone, Copy)]
198pub struct TableName<'a> {
199    name: &'a str,
200}
201
202impl<'a> TableName<'a> {
203    /// Construct a validated table name.
204    pub fn new(name: &'a str) -> crate::Result<Self> {
205        if name.is_empty() {
206            return Err(error::fmt!(
207                InvalidName,
208                "Table names must have a non-zero length."
209            ));
210        }
211
212        let mut prev = '\0';
213        for (index, c) in name.chars().enumerate() {
214            match c {
215                '.' => {
216                    if index == 0 || index == name.len() - 1 || prev == '.' {
217                        return Err(error::fmt!(
218                            InvalidName,
219                            concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
220                            name,
221                            index
222                        ));
223                    }
224                }
225                '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
226                | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
227                | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
228                | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
229                    return Err(error::fmt!(
230                        InvalidName,
231                        concat!(
232                            "Bad string {:?}: ",
233                            "Table names can't contain ",
234                            "a {:?} character, which was found at ",
235                            "byte position {}."
236                        ),
237                        name,
238                        c,
239                        index
240                    ));
241                }
242                '\u{feff}' => {
243                    // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
244                    // aka UTF-8 BOM if it appears anywhere in the string.
245                    return Err(error::fmt!(
246                        InvalidName,
247                        concat!(
248                            "Bad string {:?}: ",
249                            "Table names can't contain ",
250                            "a UTF-8 BOM character, which was found at ",
251                            "byte position {}."
252                        ),
253                        name,
254                        index
255                    ));
256                }
257                _ => (),
258            }
259            prev = c;
260        }
261
262        Ok(Self { name })
263    }
264
265    /// Construct a table name without validating it.
266    ///
267    /// This breaks API encapsulation and is only intended for use
268    /// when the string was already previously validated.
269    ///
270    /// The QuestDB server will reject an invalid table name.
271    pub fn new_unchecked(name: &'a str) -> Self {
272        Self { name }
273    }
274}
275
276impl<'a> TryFrom<&'a str> for TableName<'a> {
277    type Error = Error;
278
279    fn try_from(name: &'a str) -> crate::Result<Self> {
280        Self::new(name)
281    }
282}
283
284impl<'a> AsRef<str> for TableName<'a> {
285    fn as_ref(&self) -> &str {
286        self.name
287    }
288}
289
290/// A validated column name.
291///
292/// This type simply wraps a `&str`.
293///
294/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method,
295/// it doesn't have to validate it again. This saves CPU cycles.
296#[derive(Clone, Copy)]
297pub struct ColumnName<'a> {
298    name: &'a str,
299}
300
301impl<'a> ColumnName<'a> {
302    /// Construct a validated table name.
303    pub fn new(name: &'a str) -> crate::Result<Self> {
304        if name.is_empty() {
305            return Err(error::fmt!(
306                InvalidName,
307                "Column names must have a non-zero length."
308            ));
309        }
310
311        for (index, c) in name.chars().enumerate() {
312            match c {
313                '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
314                | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
315                | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
316                | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
317                    return Err(error::fmt!(
318                        InvalidName,
319                        concat!(
320                            "Bad string {:?}: ",
321                            "Column names can't contain ",
322                            "a {:?} character, which was found at ",
323                            "byte position {}."
324                        ),
325                        name,
326                        c,
327                        index
328                    ));
329                }
330                '\u{FEFF}' => {
331                    // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
332                    // aka UTF-8 BOM if it appears anywhere in the string.
333                    return Err(error::fmt!(
334                        InvalidName,
335                        concat!(
336                            "Bad string {:?}: ",
337                            "Column names can't contain ",
338                            "a UTF-8 BOM character, which was found at ",
339                            "byte position {}."
340                        ),
341                        name,
342                        index
343                    ));
344                }
345                _ => (),
346            }
347        }
348
349        Ok(Self { name })
350    }
351
352    /// Construct a column name without validating it.
353    ///
354    /// This breaks API encapsulation and is only intended for use
355    /// when the string was already previously validated.
356    ///
357    /// The QuestDB server will reject an invalid column name.
358    pub fn new_unchecked(name: &'a str) -> Self {
359        Self { name }
360    }
361}
362
363impl<'a> TryFrom<&'a str> for ColumnName<'a> {
364    type Error = Error;
365
366    fn try_from(name: &'a str) -> crate::Result<Self> {
367        Self::new(name)
368    }
369}
370
371impl<'a> AsRef<str> for ColumnName<'a> {
372    fn as_ref(&self) -> &str {
373        self.name
374    }
375}
376
377/// A reusable buffer to prepare a batch of ILP messages.
378///
379/// # Example
380///
381/// ```no_run
382/// # use questdb::Result;
383/// # use questdb::ingress::SenderBuilder;
384///
385/// # fn main() -> Result<()> {
386/// # let mut sender = SenderBuilder::from_conf("http::addr=localhost:9000;")?.build()?;
387/// # use questdb::Result;
388/// use questdb::ingress::{Buffer, TimestampMicros, TimestampNanos};
389/// let mut buffer = sender.new_buffer();
390///
391/// // first row
392/// buffer
393///     .table("table1")?
394///     .symbol("bar", "baz")?
395///     .column_bool("a", false)?
396///     .column_i64("b", 42)?
397///     .column_f64("c", 3.14)?
398///     .column_str("d", "hello")?
399///     .column_ts("e", TimestampMicros::now())?
400///     .at(TimestampNanos::now())?;
401///
402/// // second row
403/// buffer
404///     .table("table2")?
405///     .symbol("foo", "bar")?
406///     .at(TimestampNanos::now())?;
407/// # Ok(())
408/// # }
409/// ```
410///
411/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush).
412///
413/// # Sequential Coupling
414/// The Buffer API is sequentially coupled:
415///   * A row always starts with [`table`](Buffer::table).
416///   * A row must contain at least one [`symbol`](Buffer::symbol) or
417///     column (
418///     [`column_bool`](Buffer::column_bool),
419///     [`column_i64`](Buffer::column_i64),
420///     [`column_f64`](Buffer::column_f64),
421///     [`column_str`](Buffer::column_str),
422///     [`column_arr`](Buffer::column_arr),
423///     [`column_ts`](Buffer::column_ts)).
424///   * Symbols must appear before columns.
425///   * A row must be terminated with either [`at`](Buffer::at) or
426///     [`at_now`](Buffer::at_now).
427///
428/// This diagram visualizes the sequence:
429///
430/// <img src="https://raw.githubusercontent.com/questdb/c-questdb-client/main/api_seq/seq.svg">
431///
432/// # Buffer method calls, Serialized ILP types and QuestDB types
433///
434/// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) |
435/// |---------------|--------------------------------------------------------------|
436/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) |
437/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) |
438/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) |
439/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) |
440/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) |
441/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.io/docs/reference/api/ilp/columnset-types#array) |
442/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) |
443///
444/// QuestDB supports both `STRING` and `SYMBOL` column types.
445///
446/// To understand the difference, refer to the
447/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell,
448/// symbols are interned strings, most suitable for identifiers that are repeated many
449/// times throughout the column. They offer an advantage in storage space and query
450/// performance.
451///
452/// # Inserting NULL values
453///
454/// To insert a NULL value, skip the symbol or column for that row.
455///
456/// # Recovering from validation errors
457///
458/// If you want to recover from potential validation errors, call
459/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state,
460/// append as many rows or parts of rows as you like, and then call
461/// [`buffer.clear_marker()`](Buffer::clear_marker) on success.
462///
463/// If there was an error in one of the rows, use
464/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the
465/// marked last known good state.
466///
467#[derive(Clone)]
468pub struct Buffer {
469    output: Vec<u8>,
470    state: BufferState,
471    marker: Option<(usize, BufferState)>,
472    max_name_len: usize,
473    protocol_version: ProtocolVersion,
474}
475
476impl Buffer {
477    /// Creates a new [`Buffer`] with default parameters.
478    ///
479    /// - Uses the specified protocol version
480    /// - Sets maximum name length to **127 characters** (QuestDB server default)
481    ///
482    /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
483    /// and [`Sender::max_name_len`] is 127.
484    ///
485    /// For custom name lengths, use [`Self::with_max_name_len`]
486    pub fn new(protocol_version: ProtocolVersion) -> Self {
487        Self::with_max_name_len(protocol_version, MAX_NAME_LEN_DEFAULT)
488    }
489
490    /// Creates a new [`Buffer`] with a custom maximum name length.
491    ///
492    /// - `max_name_len`: Maximum allowed length for table/column names, match
493    ///   your QuestDB server's `cairo.max.file.name.length` configuration
494    /// - `protocol_version`: Protocol version to use
495    ///
496    /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
497    /// and [`Sender::max_name_len`].
498    ///
499    /// For the default max name length limit (127), use [`Self::new`].
500    pub fn with_max_name_len(protocol_version: ProtocolVersion, max_name_len: usize) -> Self {
501        Self {
502            output: Vec::new(),
503            state: BufferState::new(),
504            marker: None,
505            max_name_len,
506            protocol_version,
507        }
508    }
509
510    pub fn protocol_version(&self) -> ProtocolVersion {
511        self.protocol_version
512    }
513
514    /// Pre-allocate to ensure the buffer has enough capacity for at least the
515    /// specified additional byte count. This may be rounded up.
516    /// This does not allocate if such additional capacity is already satisfied.
517    /// See: `capacity`.
518    pub fn reserve(&mut self, additional: usize) {
519        self.output.reserve(additional);
520    }
521
522    /// The number of bytes accumulated in the buffer.
523    pub fn len(&self) -> usize {
524        self.output.len()
525    }
526
527    /// The number of rows accumulated in the buffer.
528    pub fn row_count(&self) -> usize {
529        self.state.row_count
530    }
531
532    /// Tells whether the buffer is transactional. It is transactional iff it contains
533    /// data for at most one table. Additionally, you must send the buffer over HTTP to
534    /// get transactional behavior.
535    pub fn transactional(&self) -> bool {
536        self.state.transactional
537    }
538
539    pub fn is_empty(&self) -> bool {
540        self.output.is_empty()
541    }
542
543    /// The total number of bytes the buffer can hold before it needs to resize.
544    pub fn capacity(&self) -> usize {
545        self.output.capacity()
546    }
547
548    pub fn as_bytes(&self) -> &[u8] {
549        &self.output
550    }
551
552    /// Mark a rewind point.
553    /// This allows undoing accumulated changes to the buffer for one or more
554    /// rows by calling [`rewind_to_marker`](Buffer::rewind_to_marker).
555    /// Any previous marker will be discarded.
556    /// Once the marker is no longer needed, call
557    /// [`clear_marker`](Buffer::clear_marker).
558    pub fn set_marker(&mut self) -> crate::Result<()> {
559        if (self.state.op_case as isize & Op::Table as isize) == 0 {
560            return Err(error::fmt!(
561                InvalidApiCall,
562                concat!(
563                    "Can't set the marker whilst constructing a line. ",
564                    "A marker may only be set on an empty buffer or after ",
565                    "`at` or `at_now` is called."
566                )
567            ));
568        }
569        self.marker = Some((self.output.len(), self.state));
570        Ok(())
571    }
572
573    /// Undo all changes since the last [`set_marker`](Buffer::set_marker)
574    /// call.
575    ///
576    /// As a side effect, this also clears the marker.
577    pub fn rewind_to_marker(&mut self) -> crate::Result<()> {
578        if let Some((position, state)) = self.marker.take() {
579            self.output.truncate(position);
580            self.state = state;
581            Ok(())
582        } else {
583            Err(error::fmt!(
584                InvalidApiCall,
585                "Can't rewind to the marker: No marker set."
586            ))
587        }
588    }
589
590    /// Discard any marker as may have been set by
591    /// [`set_marker`](Buffer::set_marker).
592    ///
593    /// Idempotent.
594    pub fn clear_marker(&mut self) {
595        self.marker = None;
596    }
597
598    /// Reset the buffer and clear contents whilst retaining
599    /// [`capacity`](Buffer::capacity).
600    pub fn clear(&mut self) {
601        self.output.clear();
602        self.state = BufferState::new();
603        self.marker = None;
604    }
605
606    /// Check if the next API operation is allowed as per the OP case state machine.
607    #[inline(always)]
608    fn check_op(&self, op: Op) -> crate::Result<()> {
609        if (self.state.op_case as isize & op as isize) > 0 {
610            Ok(())
611        } else {
612            Err(error::fmt!(
613                InvalidApiCall,
614                "State error: Bad call to `{}`, {}.",
615                op.descr(),
616                self.state.op_case.next_op_descr()
617            ))
618        }
619    }
620
621    /// Checks if this buffer is ready to be flushed to a sender via one of the
622    /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer
623    /// is ready to be flushed via a [`Sender`] while an [`Err`] will contain a
624    /// message indicating why this [`Buffer`] cannot be flushed at the moment.
625    #[inline(always)]
626    pub fn check_can_flush(&self) -> crate::Result<()> {
627        self.check_op(Op::Flush)
628    }
629
630    #[inline(always)]
631    fn validate_max_name_len(&self, name: &str) -> crate::Result<()> {
632        if name.len() > self.max_name_len {
633            return Err(error::fmt!(
634                InvalidName,
635                "Bad name: {:?}: Too long (max {} characters)",
636                name,
637                self.max_name_len
638            ));
639        }
640        Ok(())
641    }
642
643    /// Begin recording a new row for the given table.
644    ///
645    /// ```no_run
646    /// # use questdb::Result;
647    /// # use questdb::ingress::{Buffer, SenderBuilder};
648    /// # fn main() -> Result<()> {
649    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
650    /// # let mut buffer = sender.new_buffer();
651    /// buffer.table("table_name")?;
652    /// # Ok(())
653    /// # }
654    /// ```
655    ///
656    /// or
657    ///
658    /// ```no_run
659    /// # use questdb::Result;
660    /// # use questdb::ingress::{Buffer, SenderBuilder};
661    /// use questdb::ingress::TableName;
662    ///
663    /// # fn main() -> Result<()> {
664    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
665    /// # let mut buffer = sender.new_buffer();
666    /// let table_name = TableName::new("table_name")?;
667    /// buffer.table(table_name)?;
668    /// # Ok(())
669    /// # }
670    /// ```
671    pub fn table<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
672    where
673        N: TryInto<TableName<'a>>,
674        Error: From<N::Error>,
675    {
676        let name: TableName<'a> = name.try_into()?;
677        self.validate_max_name_len(name.name)?;
678        self.check_op(Op::Table)?;
679        let table_begin = self.output.len();
680        write_escaped_unquoted(&mut self.output, name.name);
681        let table_end = self.output.len();
682        self.state.op_case = OpCase::TableWritten;
683
684        // A buffer stops being transactional if it targets multiple tables.
685        if let Some(first_table_len) = &self.state.first_table_len {
686            let first_table = &self.output[0..first_table_len.get()];
687            let this_table = &self.output[table_begin..table_end];
688            if first_table != this_table {
689                self.state.transactional = false;
690            }
691        } else {
692            debug_assert!(table_begin == 0);
693
694            // This is a bit confusing, so worth explaining:
695            // `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
696            // but we know that `table_end` is never 0 here, we just need an option type
697            // anyway, so we don't bother unwrapping it to then wrap it again.
698            let first_table_len = NonZeroUsize::new(table_end);
699
700            // Instead we just assert that it's `Some`.
701            debug_assert!(first_table_len.is_some());
702
703            self.state.first_table_len = first_table_len;
704        }
705        Ok(self)
706    }
707
708    /// Record a symbol for the given column.
709    /// Make sure you record all symbol columns before any other column type.
710    ///
711    /// ```no_run
712    /// # use questdb::Result;
713    /// # use questdb::ingress::{Buffer, SenderBuilder};
714    /// # fn main() -> Result<()> {
715    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
716    /// # let mut buffer = sender.new_buffer();
717    /// # buffer.table("x")?;
718    /// buffer.symbol("col_name", "value")?;
719    /// # Ok(())
720    /// # }
721    /// ```
722    ///
723    /// or
724    ///
725    /// ```no_run
726    /// # use questdb::Result;
727    /// # use questdb::ingress::{Buffer, SenderBuilder};
728    /// # fn main() -> Result<()> {
729    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
730    /// # let mut buffer = sender.new_buffer();
731    /// # buffer.table("x")?;
732    /// let value: String = "value".to_owned();
733    /// buffer.symbol("col_name", value)?;
734    /// # Ok(())
735    /// # }
736    /// ```
737    ///
738    /// or
739    ///
740    /// ```no_run
741    /// # use questdb::Result;
742    /// # use questdb::ingress::{Buffer, SenderBuilder};
743    /// use questdb::ingress::ColumnName;
744    ///
745    /// # fn main() -> Result<()> {
746    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
747    /// # let mut buffer = sender.new_buffer();
748    /// # buffer.table("x")?;
749    /// let col_name = ColumnName::new("col_name")?;
750    /// buffer.symbol(col_name, "value")?;
751    /// # Ok(())
752    /// # }
753    /// ```
754    ///
755    pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
756    where
757        N: TryInto<ColumnName<'a>>,
758        S: AsRef<str>,
759        Error: From<N::Error>,
760    {
761        let name: ColumnName<'a> = name.try_into()?;
762        self.validate_max_name_len(name.name)?;
763        self.check_op(Op::Symbol)?;
764        self.output.push(b',');
765        write_escaped_unquoted(&mut self.output, name.name);
766        self.output.push(b'=');
767        write_escaped_unquoted(&mut self.output, value.as_ref());
768        self.state.op_case = OpCase::SymbolWritten;
769        Ok(self)
770    }
771
772    fn write_column_key<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
773    where
774        N: TryInto<ColumnName<'a>>,
775        Error: From<N::Error>,
776    {
777        let name: ColumnName<'a> = name.try_into()?;
778        self.validate_max_name_len(name.name)?;
779        self.check_op(Op::Column)?;
780        self.output
781            .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
782                b' '
783            } else {
784                b','
785            });
786        write_escaped_unquoted(&mut self.output, name.name);
787        self.output.push(b'=');
788        self.state.op_case = OpCase::ColumnWritten;
789        Ok(self)
790    }
791
792    /// Record a boolean value for the given column.
793    ///
794    /// ```no_run
795    /// # use questdb::Result;
796    /// # use questdb::ingress::{Buffer, SenderBuilder};
797    /// # fn main() -> Result<()> {
798    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
799    /// # let mut buffer = sender.new_buffer();
800    /// # buffer.table("x")?;
801    /// buffer.column_bool("col_name", true)?;
802    /// # Ok(())
803    /// # }
804    /// ```
805    ///
806    /// or
807    ///
808    /// ```no_run
809    /// # use questdb::Result;
810    /// # use questdb::ingress::{Buffer, SenderBuilder};
811    /// use questdb::ingress::ColumnName;
812    ///
813    /// # fn main() -> Result<()> {
814    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
815    /// # let mut buffer = sender.new_buffer();
816    /// # buffer.table("x")?;
817    /// let col_name = ColumnName::new("col_name")?;
818    /// buffer.column_bool(col_name, true)?;
819    /// # Ok(())
820    /// # }
821    /// ```
822    pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> crate::Result<&mut Self>
823    where
824        N: TryInto<ColumnName<'a>>,
825        Error: From<N::Error>,
826    {
827        self.write_column_key(name)?;
828        self.output.push(if value { b't' } else { b'f' });
829        Ok(self)
830    }
831
832    /// Record an integer value for the given column.
833    ///
834    /// ```no_run
835    /// # use questdb::Result;
836    /// # use questdb::ingress::{Buffer, SenderBuilder};
837    /// # fn main() -> Result<()> {
838    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
839    /// # let mut buffer = sender.new_buffer();
840    /// # buffer.table("x")?;
841    /// buffer.column_i64("col_name", 42)?;
842    /// # Ok(())
843    /// # }
844    /// ```
845    ///
846    /// or
847    ///
848    /// ```no_run
849    /// # use questdb::Result;
850    /// # use questdb::ingress::{Buffer, SenderBuilder};
851    /// use questdb::ingress::ColumnName;
852    ///
853    /// # fn main() -> Result<()> {
854    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
855    /// # let mut buffer = sender.new_buffer();
856    /// # buffer.table("x")?;
857    /// let col_name = ColumnName::new("col_name")?;
858    /// buffer.column_i64(col_name, 42)?;
859    /// # Ok(())
860    /// # }
861    /// ```
862    pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> crate::Result<&mut Self>
863    where
864        N: TryInto<ColumnName<'a>>,
865        Error: From<N::Error>,
866    {
867        self.write_column_key(name)?;
868        let mut buf = itoa::Buffer::new();
869        let printed = buf.format(value);
870        self.output.extend_from_slice(printed.as_bytes());
871        self.output.push(b'i');
872        Ok(self)
873    }
874
875    /// Record a floating point value for the given column.
876    ///
877    /// ```no_run
878    /// # use questdb::Result;
879    /// # use questdb::ingress::{Buffer, SenderBuilder};
880    /// # fn main() -> Result<()> {
881    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
882    /// # let mut buffer = sender.new_buffer();
883    /// # buffer.table("x")?;
884    /// buffer.column_f64("col_name", 3.14)?;
885    /// # Ok(())
886    /// # }
887    /// ```
888    ///
889    /// or
890    ///
891    /// ```no_run
892    /// # use questdb::Result;
893    /// # use questdb::ingress::{Buffer, SenderBuilder};
894    /// use questdb::ingress::ColumnName;
895    ///
896    /// # fn main() -> Result<()> {
897    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
898    /// # let mut buffer = sender.new_buffer();
899    /// # buffer.table("x")?;
900    /// let col_name = ColumnName::new("col_name")?;
901    /// buffer.column_f64(col_name, 3.14)?;
902    /// # Ok(())
903    /// # }
904    /// ```
905    pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> crate::Result<&mut Self>
906    where
907        N: TryInto<ColumnName<'a>>,
908        Error: From<N::Error>,
909    {
910        self.write_column_key(name)?;
911        if !matches!(self.protocol_version, ProtocolVersion::V1) {
912            self.output.push(b'=');
913            self.output.push(DOUBLE_BINARY_FORMAT_TYPE);
914            self.output.extend_from_slice(&value.to_le_bytes())
915        } else {
916            let mut ser = F64Serializer::new(value);
917            self.output.extend_from_slice(ser.as_str().as_bytes())
918        }
919        Ok(self)
920    }
921
922    /// Record a string value for the given column.
923    ///
924    /// ```no_run
925    /// # use questdb::Result;
926    /// # use questdb::ingress::{Buffer, SenderBuilder};
927    /// # fn main() -> Result<()> {
928    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
929    /// # let mut buffer = sender.new_buffer();
930    /// # buffer.table("x")?;
931    /// buffer.column_str("col_name", "value")?;
932    /// # Ok(())
933    /// # }
934    /// ```
935    ///
936    /// or
937    ///
938    /// ```no_run
939    /// # use questdb::Result;
940    /// # use questdb::ingress::{Buffer, SenderBuilder};
941    /// # fn main() -> Result<()> {
942    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
943    /// # let mut buffer = sender.new_buffer();
944    /// # buffer.table("x")?;
945    /// let value: String = "value".to_owned();
946    /// buffer.column_str("col_name", value)?;
947    /// # Ok(())
948    /// # }
949    /// ```
950    ///
951    /// or
952    ///
953    /// ```no_run
954    /// # use questdb::Result;
955    /// # use questdb::ingress::{Buffer, SenderBuilder};
956    /// use questdb::ingress::ColumnName;
957    ///
958    /// # fn main() -> Result<()> {
959    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
960    /// # let mut buffer = sender.new_buffer();
961    /// # buffer.table("x")?;
962    /// let col_name = ColumnName::new("col_name")?;
963    /// buffer.column_str(col_name, "value")?;
964    /// # Ok(())
965    /// # }
966    /// ```
967    pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
968    where
969        N: TryInto<ColumnName<'a>>,
970        S: AsRef<str>,
971        Error: From<N::Error>,
972    {
973        self.write_column_key(name)?;
974        write_escaped_quoted(&mut self.output, value.as_ref());
975        Ok(self)
976    }
977
978    /// Record a multidimensional array value for the given column.
979    ///
980    /// Supports arrays with up to [`MAX_ARRAY_DIMS`] dimensions. The array elements must
981    /// be of type `f64`, which is currently the only supported data type.
982    ///
983    /// **Note**: QuestDB server version 9.0.0 or later is required for array support.
984    ///
985    /// # Examples
986    ///
987    /// Recording a 2D array using slices:
988    ///
989    /// ```no_run
990    /// # use questdb::Result;
991    /// # use questdb::ingress::{Buffer, SenderBuilder};
992    /// # fn main() -> Result<()> {
993    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
994    /// # let mut buffer = sender.new_buffer();
995    /// # buffer.table("x")?;
996    /// let array_2d = vec![vec![1.1, 2.2], vec![3.3, 4.4]];
997    /// buffer.column_arr("array_col", &array_2d)?;
998    /// # Ok(())
999    /// # }
1000    /// ```
1001    ///
1002    /// Recording a 3D array using vectors:
1003    ///
1004    /// ```no_run
1005    /// # use questdb::Result;
1006    /// # use questdb::ingress::{Buffer, ColumnName, SenderBuilder};
1007    /// # fn main() -> Result<()> {
1008    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1009    /// # let mut buffer = sender.new_buffer();
1010    /// # buffer.table("x1")?;
1011    /// let array_3d = vec![vec![vec![42.0; 4]; 3]; 2];
1012    /// let col_name = ColumnName::new("col1")?;
1013    /// buffer.column_arr(col_name, &array_3d)?;
1014    /// # Ok(())
1015    /// # }
1016    /// ```
1017    ///
1018    /// # Errors
1019    ///
1020    /// Returns [`Error`] if:
1021    /// - Array dimensions exceed [`MAX_ARRAY_DIMS`]
1022    /// - Failed to get dimension sizes
1023    /// - Column name validation fails
1024    /// - Protocol version v1 is used (arrays require v2+)
1025    #[allow(private_bounds)]
1026    pub fn column_arr<'a, N, T, D>(&mut self, name: N, view: &T) -> crate::Result<&mut Self>
1027    where
1028        N: TryInto<ColumnName<'a>>,
1029        T: NdArrayView<D>,
1030        D: ArrayElement + ArrayElementSealed,
1031        Error: From<N::Error>,
1032    {
1033        if self.protocol_version == ProtocolVersion::V1 {
1034            return Err(error::fmt!(
1035                ProtocolVersionError,
1036                "Protocol version v1 does not support array datatype",
1037            ));
1038        }
1039        let ndim = view.ndim();
1040        if ndim == 0 {
1041            return Err(error::fmt!(
1042                ArrayError,
1043                "Zero-dimensional arrays are not supported",
1044            ));
1045        }
1046
1047        // check dimension less equal than max dims
1048        if MAX_ARRAY_DIMS < ndim {
1049            return Err(error::fmt!(
1050                ArrayError,
1051                "Array dimension mismatch: expected at most {} dimensions, but got {}",
1052                MAX_ARRAY_DIMS,
1053                ndim
1054            ));
1055        }
1056
1057        let array_buf_size = check_and_get_array_bytes_size(view)?;
1058        self.write_column_key(name)?;
1059        // binary format flag '='
1060        self.output.push(b'=');
1061        // binary format entity type
1062        self.output.push(ARRAY_BINARY_FORMAT_TYPE);
1063        // ndarr datatype
1064        self.output.push(D::type_tag());
1065        // ndarr dims
1066        self.output.push(ndim as u8);
1067
1068        let dim_header_size = size_of::<u32>() * ndim;
1069        self.output.reserve(dim_header_size + array_buf_size);
1070
1071        for i in 0..ndim {
1072            // ndarr shape
1073            self.output
1074                .extend_from_slice((view.dim(i)? as u32).to_le_bytes().as_slice());
1075        }
1076
1077        let index = self.output.len();
1078        let writeable =
1079            unsafe { from_raw_parts_mut(self.output.as_mut_ptr().add(index), array_buf_size) };
1080
1081        // ndarr data
1082        ndarr::write_array_data(view, writeable, array_buf_size)?;
1083        unsafe { self.output.set_len(array_buf_size + index) }
1084        Ok(self)
1085    }
1086
1087    /// Record a timestamp value for the given column.
1088    ///
1089    /// ```no_run
1090    /// # use questdb::Result;
1091    /// # use questdb::ingress::{Buffer, SenderBuilder};
1092    /// use questdb::ingress::TimestampMicros;
1093    /// # fn main() -> Result<()> {
1094    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1095    /// # let mut buffer = sender.new_buffer();
1096    /// # buffer.table("x")?;
1097    /// buffer.column_ts("col_name", TimestampMicros::now())?;
1098    /// # Ok(())
1099    /// # }
1100    /// ```
1101    ///
1102    /// or
1103    ///
1104    /// ```no_run
1105    /// # use questdb::Result;
1106    /// # use questdb::ingress::{Buffer, SenderBuilder};
1107    /// use questdb::ingress::TimestampMicros;
1108    ///
1109    /// # fn main() -> Result<()> {
1110    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1111    /// # let mut buffer = sender.new_buffer();
1112    /// # buffer.table("x")?;
1113    /// buffer.column_ts("col_name", TimestampMicros::new(1659548204354448))?;
1114    /// # Ok(())
1115    /// # }
1116    /// ```
1117    ///
1118    /// or
1119    ///
1120    /// ```no_run
1121    /// # use questdb::Result;
1122    /// # use questdb::ingress::{Buffer, SenderBuilder};
1123    /// use questdb::ingress::TimestampMicros;
1124    /// use questdb::ingress::ColumnName;
1125    ///
1126    /// # fn main() -> Result<()> {
1127    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1128    /// # let mut buffer = sender.new_buffer();
1129    /// # buffer.table("x")?;
1130    /// let col_name = ColumnName::new("col_name")?;
1131    /// buffer.column_ts(col_name, TimestampMicros::now())?;
1132    /// # Ok(())
1133    /// # }
1134    /// ```
1135    ///
1136    /// or you can also pass in a `TimestampNanos`.
1137    ///
1138    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1139    /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1140    ///
1141    /// This last option requires the `chrono_timestamp` feature.
1142    pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> crate::Result<&mut Self>
1143    where
1144        N: TryInto<ColumnName<'a>>,
1145        T: TryInto<Timestamp>,
1146        Error: From<N::Error>,
1147        Error: From<T::Error>,
1148    {
1149        self.write_column_key(name)?;
1150        let timestamp: Timestamp = value.try_into()?;
1151        let timestamp: TimestampMicros = timestamp.try_into()?;
1152        let mut buf = itoa::Buffer::new();
1153        let printed = buf.format(timestamp.as_i64());
1154        self.output.extend_from_slice(printed.as_bytes());
1155        self.output.push(b't');
1156        Ok(self)
1157    }
1158
1159    /// Complete the current row with the designated timestamp. After this call, you can
1160    /// start recording the next row by calling [Buffer::table] again, or  you can send
1161    /// the accumulated batch by calling [Sender::flush] or one of its variants.
1162    ///
1163    /// ```no_run
1164    /// # use questdb::Result;
1165    /// # use questdb::ingress::{Buffer, SenderBuilder};
1166    /// use questdb::ingress::TimestampNanos;
1167    /// # fn main() -> Result<()> {
1168    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1169    /// # let mut buffer = sender.new_buffer();
1170    /// # buffer.table("x")?.symbol("a", "b")?;
1171    /// buffer.at(TimestampNanos::now())?;
1172    /// # Ok(())
1173    /// # }
1174    /// ```
1175    ///
1176    /// or
1177    ///
1178    /// ```no_run
1179    /// # use questdb::Result;
1180    /// # use questdb::ingress::{Buffer, SenderBuilder};
1181    /// use questdb::ingress::TimestampNanos;
1182    ///
1183    /// # fn main() -> Result<()> {
1184    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1185    /// # let mut buffer = sender.new_buffer();
1186    /// # buffer.table("x")?.symbol("a", "b")?;
1187    /// buffer.at(TimestampNanos::new(1659548315647406592))?;
1188    /// # Ok(())
1189    /// # }
1190    /// ```
1191    ///
1192    /// You can also pass in a `TimestampMicros`.
1193    ///
1194    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1195    /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1196    ///
1197    pub fn at<T>(&mut self, timestamp: T) -> crate::Result<()>
1198    where
1199        T: TryInto<Timestamp>,
1200        Error: From<T::Error>,
1201    {
1202        self.check_op(Op::At)?;
1203        let timestamp: Timestamp = timestamp.try_into()?;
1204
1205        // https://github.com/rust-lang/rust/issues/115880
1206        let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
1207        let timestamp: TimestampNanos = timestamp?;
1208
1209        let epoch_nanos = timestamp.as_i64();
1210        if epoch_nanos < 0 {
1211            return Err(error::fmt!(
1212                InvalidTimestamp,
1213                "Timestamp {} is negative. It must be >= 0.",
1214                epoch_nanos
1215            ));
1216        }
1217        let mut buf = itoa::Buffer::new();
1218        let printed = buf.format(epoch_nanos);
1219        self.output.push(b' ');
1220        self.output.extend_from_slice(printed.as_bytes());
1221        self.output.push(b'\n');
1222        self.state.op_case = OpCase::MayFlushOrTable;
1223        self.state.row_count += 1;
1224        Ok(())
1225    }
1226
1227    /// Complete the current row without providing a timestamp. The QuestDB instance
1228    /// will insert its own timestamp.
1229    ///
1230    /// Letting the server assign the timestamp can be faster since it reliably avoids
1231    /// out-of-order operations in the database for maximum ingestion throughput. However,
1232    /// it removes the ability to deduplicate rows.
1233    ///
1234    /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB
1235    /// server will set the timestamp only after receiving the row. If you're flushing
1236    /// infrequently, the server-assigned timestamp may be significantly behind the
1237    /// time the data was recorded in the buffer.
1238    ///
1239    /// In almost all cases, you should prefer the [Buffer::at] function.
1240    ///
1241    /// After this call, you can start recording the next row by calling [Buffer::table]
1242    /// again, or you can send the accumulated batch by calling [Sender::flush] or one of
1243    /// its variants.
1244    ///
1245    /// ```no_run
1246    /// # use questdb::Result;
1247    /// # use questdb::ingress::{Buffer, SenderBuilder};
1248    /// # fn main() -> Result<()> {
1249    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1250    /// # let mut buffer = sender.new_buffer();
1251    /// # buffer.table("x")?.symbol("a", "b")?;
1252    /// buffer.at_now()?;
1253    /// # Ok(())
1254    /// # }
1255    /// ```
1256    pub fn at_now(&mut self) -> crate::Result<()> {
1257        self.check_op(Op::At)?;
1258        self.output.push(b'\n');
1259        self.state.op_case = OpCase::MayFlushOrTable;
1260        self.state.row_count += 1;
1261        Ok(())
1262    }
1263}
1264
1265impl Debug for Buffer {
1266    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1267        f.debug_struct("Buffer")
1268            .field("output", &DebugBytes(&self.output))
1269            .field("state", &self.state)
1270            .field("marker", &self.marker)
1271            .field("max_name_len", &self.max_name_len)
1272            .field("protocol_version", &self.protocol_version)
1273            .finish()
1274    }
1275}