questdb/ingress/
buffer.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2025 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24
25use crate::ingress::decimal::DecimalView;
26use crate::ingress::ndarr::{ArrayElementSealed, check_and_get_array_bytes_size};
27use crate::ingress::{
28    ARRAY_BINARY_FORMAT_TYPE, ArrayElement, DOUBLE_BINARY_FORMAT_TYPE, DebugBytes, MAX_ARRAY_DIMS,
29    MAX_NAME_LEN_DEFAULT, NdArrayView, ProtocolVersion, Timestamp, TimestampMicros, TimestampNanos,
30    ndarr,
31};
32use crate::{Error, error};
33use std::fmt::{Debug, Formatter};
34use std::num::NonZeroUsize;
35use std::slice::from_raw_parts_mut;
36
37fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut Vec<u8>, s: &str)
38where
39    C: Fn(u8) -> bool,
40    Q: Fn(&mut Vec<u8>),
41{
42    let mut to_escape = 0usize;
43    for b in s.bytes() {
44        if check_escape_fn(b) {
45            to_escape += 1;
46        }
47    }
48
49    quoting_fn(output);
50
51    if to_escape == 0 {
52        // output.push_str(s);
53        output.extend_from_slice(s.as_bytes());
54    } else {
55        let additional = s.len() + to_escape;
56        output.reserve(additional);
57        let mut index = output.len();
58        unsafe { output.set_len(index + additional) };
59        for b in s.bytes() {
60            if check_escape_fn(b) {
61                unsafe {
62                    *output.get_unchecked_mut(index) = b'\\';
63                }
64                index += 1;
65            }
66
67            unsafe {
68                *output.get_unchecked_mut(index) = b;
69            }
70            index += 1;
71        }
72    }
73
74    quoting_fn(output);
75}
76
77fn must_escape_unquoted(c: u8) -> bool {
78    matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
79}
80
81fn must_escape_quoted(c: u8) -> bool {
82    matches!(c, b'\n' | b'\r' | b'"' | b'\\')
83}
84
85fn write_escaped_unquoted(output: &mut Vec<u8>, s: &str) {
86    write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
87}
88
89fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
90    write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
91}
92
93pub(crate) struct F64Serializer {
94    buf: ryu::Buffer,
95    n: f64,
96}
97
98impl F64Serializer {
99    pub(crate) fn new(n: f64) -> Self {
100        F64Serializer {
101            buf: ryu::Buffer::new(),
102            n,
103        }
104    }
105
106    // This function was taken and customized from the ryu crate.
107    #[cold]
108    fn format_nonfinite(&self) -> &'static str {
109        const MANTISSA_MASK: u64 = 0x000fffffffffffff;
110        const SIGN_MASK: u64 = 0x8000000000000000;
111        let bits = self.n.to_bits();
112        if bits & MANTISSA_MASK != 0 {
113            "NaN"
114        } else if bits & SIGN_MASK != 0 {
115            "-Infinity"
116        } else {
117            "Infinity"
118        }
119    }
120
121    pub(crate) fn as_str(&mut self) -> &str {
122        if self.n.is_finite() {
123            self.buf.format_finite(self.n)
124        } else {
125            self.format_nonfinite()
126        }
127    }
128}
129
130#[derive(Debug, Copy, Clone)]
131enum Op {
132    Table = 1,
133    Symbol = 1 << 1,
134    Column = 1 << 2,
135    At = 1 << 3,
136    Flush = 1 << 4,
137}
138
139impl Op {
140    fn descr(self) -> &'static str {
141        match self {
142            Op::Table => "table",
143            Op::Symbol => "symbol",
144            Op::Column => "column",
145            Op::At => "at",
146            Op::Flush => "flush",
147        }
148    }
149}
150
151#[derive(Debug, Copy, Clone, PartialEq)]
152enum OpCase {
153    Init = Op::Table as isize,
154    TableWritten = Op::Symbol as isize | Op::Column as isize,
155    SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
156    ColumnWritten = Op::Column as isize | Op::At as isize,
157    MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
158}
159
160impl OpCase {
161    fn next_op_descr(self) -> &'static str {
162        match self {
163            OpCase::Init => "should have called `table` instead",
164            OpCase::TableWritten => "should have called `symbol` or `column` instead",
165            OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
166            OpCase::ColumnWritten => "should have called `column` or `at` instead",
167            OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
168        }
169    }
170}
171
172// IMPORTANT: This struct MUST remain `Copy` to ensure that
173// there are no heap allocations when performing marker operations.
174#[derive(Debug, Clone, Copy)]
175struct BufferState {
176    op_case: OpCase,
177    row_count: usize,
178    first_table_len: Option<NonZeroUsize>,
179    transactional: bool,
180}
181
182impl BufferState {
183    fn new() -> Self {
184        Self {
185            op_case: OpCase::Init,
186            row_count: 0,
187            first_table_len: None,
188            transactional: true,
189        }
190    }
191}
192
193/// A validated table name.
194///
195/// This type simply wraps a `&str`.
196///
197/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method,
198/// it doesn't have to validate it again. This saves CPU cycles.
199#[derive(Clone, Copy)]
200pub struct TableName<'a> {
201    name: &'a str,
202}
203
204impl<'a> TableName<'a> {
205    /// Construct a validated table name.
206    pub fn new(name: &'a str) -> crate::Result<Self> {
207        if name.is_empty() {
208            return Err(error::fmt!(
209                InvalidName,
210                "Table names must have a non-zero length."
211            ));
212        }
213
214        let mut prev = '\0';
215        for (index, c) in name.chars().enumerate() {
216            match c {
217                '.' => {
218                    if index == 0 || index == name.len() - 1 || prev == '.' {
219                        return Err(error::fmt!(
220                            InvalidName,
221                            concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
222                            name,
223                            index
224                        ));
225                    }
226                }
227                '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
228                | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
229                | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
230                | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
231                    return Err(error::fmt!(
232                        InvalidName,
233                        concat!(
234                            "Bad string {:?}: ",
235                            "Table names can't contain ",
236                            "a {:?} character, which was found at ",
237                            "byte position {}."
238                        ),
239                        name,
240                        c,
241                        index
242                    ));
243                }
244                '\u{feff}' => {
245                    // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
246                    // aka UTF-8 BOM if it appears anywhere in the string.
247                    return Err(error::fmt!(
248                        InvalidName,
249                        concat!(
250                            "Bad string {:?}: ",
251                            "Table names can't contain ",
252                            "a UTF-8 BOM character, which was found at ",
253                            "byte position {}."
254                        ),
255                        name,
256                        index
257                    ));
258                }
259                _ => (),
260            }
261            prev = c;
262        }
263
264        Ok(Self { name })
265    }
266
267    /// Construct a table name without validating it.
268    ///
269    /// This breaks API encapsulation and is only intended for use
270    /// when the string was already previously validated.
271    ///
272    /// The QuestDB server will reject an invalid table name.
273    pub fn new_unchecked(name: &'a str) -> Self {
274        Self { name }
275    }
276}
277
278impl<'a> TryFrom<&'a str> for TableName<'a> {
279    type Error = Error;
280
281    fn try_from(name: &'a str) -> crate::Result<Self> {
282        Self::new(name)
283    }
284}
285
286impl AsRef<str> for TableName<'_> {
287    fn as_ref(&self) -> &str {
288        self.name
289    }
290}
291
292/// A validated column name.
293///
294/// This type simply wraps a `&str`.
295///
296/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method,
297/// it doesn't have to validate it again. This saves CPU cycles.
298#[derive(Clone, Copy)]
299pub struct ColumnName<'a> {
300    name: &'a str,
301}
302
303impl<'a> ColumnName<'a> {
304    /// Construct a validated table name.
305    pub fn new(name: &'a str) -> crate::Result<Self> {
306        if name.is_empty() {
307            return Err(error::fmt!(
308                InvalidName,
309                "Column names must have a non-zero length."
310            ));
311        }
312
313        for (index, c) in name.chars().enumerate() {
314            match c {
315                '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
316                | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
317                | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
318                | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
319                    return Err(error::fmt!(
320                        InvalidName,
321                        concat!(
322                            "Bad string {:?}: ",
323                            "Column names can't contain ",
324                            "a {:?} character, which was found at ",
325                            "byte position {}."
326                        ),
327                        name,
328                        c,
329                        index
330                    ));
331                }
332                '\u{FEFF}' => {
333                    // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
334                    // aka UTF-8 BOM if it appears anywhere in the string.
335                    return Err(error::fmt!(
336                        InvalidName,
337                        concat!(
338                            "Bad string {:?}: ",
339                            "Column names can't contain ",
340                            "a UTF-8 BOM character, which was found at ",
341                            "byte position {}."
342                        ),
343                        name,
344                        index
345                    ));
346                }
347                _ => (),
348            }
349        }
350
351        Ok(Self { name })
352    }
353
354    /// Construct a column name without validating it.
355    ///
356    /// This breaks API encapsulation and is only intended for use
357    /// when the string was already previously validated.
358    ///
359    /// The QuestDB server will reject an invalid column name.
360    pub fn new_unchecked(name: &'a str) -> Self {
361        Self { name }
362    }
363}
364
365impl<'a> TryFrom<&'a str> for ColumnName<'a> {
366    type Error = Error;
367
368    fn try_from(name: &'a str) -> crate::Result<Self> {
369        Self::new(name)
370    }
371}
372
373impl AsRef<str> for ColumnName<'_> {
374    fn as_ref(&self) -> &str {
375        self.name
376    }
377}
378
379/// A reusable buffer to prepare a batch of ILP messages.
380///
381/// # Example
382///
383/// ```no_run
384/// # use questdb::Result;
385/// # use questdb::ingress::SenderBuilder;
386///
387/// # fn main() -> Result<()> {
388/// # let mut sender = SenderBuilder::from_conf("http::addr=localhost:9000;")?.build()?;
389/// # use questdb::Result;
390/// use questdb::ingress::{Buffer, TimestampMicros, TimestampNanos};
391/// let mut buffer = sender.new_buffer();
392///
393/// // first row
394/// buffer
395///     .table("table1")?
396///     .symbol("bar", "baz")?
397///     .column_bool("a", false)?
398///     .column_i64("b", 42)?
399///     .column_f64("c", 3.14)?
400///     .column_str("d", "hello")?
401///     .column_ts("e", TimestampMicros::now())?
402///     .at(TimestampNanos::now())?;
403///
404/// // second row
405/// buffer
406///     .table("table2")?
407///     .symbol("foo", "bar")?
408///     .at(TimestampNanos::now())?;
409/// # Ok(())
410/// # }
411/// ```
412///
413/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush).
414///
415/// # Sequential Coupling
416/// The Buffer API is sequentially coupled:
417///   * A row always starts with [`table`](Buffer::table).
418///   * A row must contain at least one [`symbol`](Buffer::symbol) or
419///     column (
420///     [`column_bool`](Buffer::column_bool),
421///     [`column_i64`](Buffer::column_i64),
422///     [`column_f64`](Buffer::column_f64),
423///     [`column_str`](Buffer::column_str),
424///     [`column_arr`](Buffer::column_arr),
425///     [`column_ts`](Buffer::column_ts)).
426///   * Symbols must appear before columns.
427///   * A row must be terminated with either [`at`](Buffer::at) or
428///     [`at_now`](Buffer::at_now).
429///
430/// This diagram visualizes the sequence:
431///
432/// <img src="https://raw.githubusercontent.com/questdb/c-questdb-client/main/api_seq/seq.svg">
433///
434/// # Buffer method calls, Serialized ILP types and QuestDB types
435///
436/// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) |
437/// |---------------|--------------------------------------------------------------|
438/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) |
439/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) |
440/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) |
441/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) |
442/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) |
443/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.io/docs/reference/api/ilp/columnset-types#array) |
444/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) |
445///
446/// QuestDB supports both `STRING` and `SYMBOL` column types.
447///
448/// To understand the difference, refer to the
449/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell,
450/// symbols are interned strings, most suitable for identifiers that are repeated many
451/// times throughout the column. They offer an advantage in storage space and query
452/// performance.
453///
454/// # Inserting NULL values
455///
456/// To insert a NULL value, skip the symbol or column for that row.
457///
458/// # Recovering from validation errors
459///
460/// If you want to recover from potential validation errors, call
461/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state,
462/// append as many rows or parts of rows as you like, and then call
463/// [`buffer.clear_marker()`](Buffer::clear_marker) on success.
464///
465/// If there was an error in one of the rows, use
466/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the
467/// marked last known good state.
468///
469#[derive(Clone)]
470pub struct Buffer {
471    output: Vec<u8>,
472    state: BufferState,
473    marker: Option<(usize, BufferState)>,
474    max_name_len: usize,
475    protocol_version: ProtocolVersion,
476}
477
478impl Buffer {
479    /// Creates a new [`Buffer`] with default parameters.
480    ///
481    /// - Uses the specified protocol version
482    /// - Sets maximum name length to **127 characters** (QuestDB server default)
483    ///
484    /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
485    /// and [`Sender::max_name_len`] is 127.
486    ///
487    /// For custom name lengths, use [`Self::with_max_name_len`]
488    pub fn new(protocol_version: ProtocolVersion) -> Self {
489        Self::with_max_name_len(protocol_version, MAX_NAME_LEN_DEFAULT)
490    }
491
492    /// Creates a new [`Buffer`] with a custom maximum name length.
493    ///
494    /// - `max_name_len`: Maximum allowed length for table/column names, match
495    ///   your QuestDB server's `cairo.max.file.name.length` configuration
496    /// - `protocol_version`: Protocol version to use
497    ///
498    /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
499    /// and [`Sender::max_name_len`].
500    ///
501    /// For the default max name length limit (127), use [`Self::new`].
502    pub fn with_max_name_len(protocol_version: ProtocolVersion, max_name_len: usize) -> Self {
503        Self {
504            output: Vec::new(),
505            state: BufferState::new(),
506            marker: None,
507            max_name_len,
508            protocol_version,
509        }
510    }
511
512    pub fn protocol_version(&self) -> ProtocolVersion {
513        self.protocol_version
514    }
515
516    /// Pre-allocate to ensure the buffer has enough capacity for at least the
517    /// specified additional byte count. This may be rounded up.
518    /// This does not allocate if such additional capacity is already satisfied.
519    /// See: `capacity`.
520    pub fn reserve(&mut self, additional: usize) {
521        self.output.reserve(additional);
522    }
523
524    /// The number of bytes accumulated in the buffer.
525    pub fn len(&self) -> usize {
526        self.output.len()
527    }
528
529    /// The number of rows accumulated in the buffer.
530    pub fn row_count(&self) -> usize {
531        self.state.row_count
532    }
533
534    /// Tells whether the buffer is transactional. It is transactional iff it contains
535    /// data for at most one table. Additionally, you must send the buffer over HTTP to
536    /// get transactional behavior.
537    pub fn transactional(&self) -> bool {
538        self.state.transactional
539    }
540
541    pub fn is_empty(&self) -> bool {
542        self.output.is_empty()
543    }
544
545    /// The total number of bytes the buffer can hold before it needs to resize.
546    pub fn capacity(&self) -> usize {
547        self.output.capacity()
548    }
549
550    pub fn as_bytes(&self) -> &[u8] {
551        &self.output
552    }
553
554    /// Mark a rewind point.
555    /// This allows undoing accumulated changes to the buffer for one or more
556    /// rows by calling [`rewind_to_marker`](Buffer::rewind_to_marker).
557    /// Any previous marker will be discarded.
558    /// Once the marker is no longer needed, call
559    /// [`clear_marker`](Buffer::clear_marker).
560    pub fn set_marker(&mut self) -> crate::Result<()> {
561        if (self.state.op_case as isize & Op::Table as isize) == 0 {
562            return Err(error::fmt!(
563                InvalidApiCall,
564                concat!(
565                    "Can't set the marker whilst constructing a line. ",
566                    "A marker may only be set on an empty buffer or after ",
567                    "`at` or `at_now` is called."
568                )
569            ));
570        }
571        self.marker = Some((self.output.len(), self.state));
572        Ok(())
573    }
574
575    /// Undo all changes since the last [`set_marker`](Buffer::set_marker)
576    /// call.
577    ///
578    /// As a side effect, this also clears the marker.
579    pub fn rewind_to_marker(&mut self) -> crate::Result<()> {
580        if let Some((position, state)) = self.marker.take() {
581            self.output.truncate(position);
582            self.state = state;
583            Ok(())
584        } else {
585            Err(error::fmt!(
586                InvalidApiCall,
587                "Can't rewind to the marker: No marker set."
588            ))
589        }
590    }
591
592    /// Discard any marker as may have been set by
593    /// [`set_marker`](Buffer::set_marker).
594    ///
595    /// Idempotent.
596    pub fn clear_marker(&mut self) {
597        self.marker = None;
598    }
599
600    /// Reset the buffer and clear contents whilst retaining
601    /// [`capacity`](Buffer::capacity).
602    pub fn clear(&mut self) {
603        self.output.clear();
604        self.state = BufferState::new();
605        self.marker = None;
606    }
607
608    /// Check if the next API operation is allowed as per the OP case state machine.
609    #[inline(always)]
610    fn check_op(&self, op: Op) -> crate::Result<()> {
611        if (self.state.op_case as isize & op as isize) > 0 {
612            Ok(())
613        } else {
614            Err(error::fmt!(
615                InvalidApiCall,
616                "State error: Bad call to `{}`, {}.",
617                op.descr(),
618                self.state.op_case.next_op_descr()
619            ))
620        }
621    }
622
623    /// Checks if this buffer is ready to be flushed to a sender via one of the
624    /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer
625    /// is ready to be flushed via a [`Sender`] while an [`Err`] will contain a
626    /// message indicating why this [`Buffer`] cannot be flushed at the moment.
627    #[inline(always)]
628    pub fn check_can_flush(&self) -> crate::Result<()> {
629        self.check_op(Op::Flush)
630    }
631
632    #[inline(always)]
633    fn validate_max_name_len(&self, name: &str) -> crate::Result<()> {
634        if name.len() > self.max_name_len {
635            return Err(error::fmt!(
636                InvalidName,
637                "Bad name: {:?}: Too long (max {} characters)",
638                name,
639                self.max_name_len
640            ));
641        }
642        Ok(())
643    }
644
645    /// Begin recording a new row for the given table.
646    ///
647    /// ```no_run
648    /// # use questdb::Result;
649    /// # use questdb::ingress::{Buffer, SenderBuilder};
650    /// # fn main() -> Result<()> {
651    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
652    /// # let mut buffer = sender.new_buffer();
653    /// buffer.table("table_name")?;
654    /// # Ok(())
655    /// # }
656    /// ```
657    ///
658    /// or
659    ///
660    /// ```no_run
661    /// # use questdb::Result;
662    /// # use questdb::ingress::{Buffer, SenderBuilder};
663    /// use questdb::ingress::TableName;
664    ///
665    /// # fn main() -> Result<()> {
666    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
667    /// # let mut buffer = sender.new_buffer();
668    /// let table_name = TableName::new("table_name")?;
669    /// buffer.table(table_name)?;
670    /// # Ok(())
671    /// # }
672    /// ```
673    pub fn table<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
674    where
675        N: TryInto<TableName<'a>>,
676        Error: From<N::Error>,
677    {
678        let name: TableName<'a> = name.try_into()?;
679        self.validate_max_name_len(name.name)?;
680        self.check_op(Op::Table)?;
681        let table_begin = self.output.len();
682        write_escaped_unquoted(&mut self.output, name.name);
683        let table_end = self.output.len();
684        self.state.op_case = OpCase::TableWritten;
685
686        // A buffer stops being transactional if it targets multiple tables.
687        if let Some(first_table_len) = &self.state.first_table_len {
688            let first_table = &self.output[0..first_table_len.get()];
689            let this_table = &self.output[table_begin..table_end];
690            if first_table != this_table {
691                self.state.transactional = false;
692            }
693        } else {
694            debug_assert!(table_begin == 0);
695
696            // This is a bit confusing, so worth explaining:
697            // `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
698            // but we know that `table_end` is never 0 here, we just need an option type
699            // anyway, so we don't bother unwrapping it to then wrap it again.
700            let first_table_len = NonZeroUsize::new(table_end);
701
702            // Instead we just assert that it's `Some`.
703            debug_assert!(first_table_len.is_some());
704
705            self.state.first_table_len = first_table_len;
706        }
707        Ok(self)
708    }
709
710    /// Record a symbol for the given column.
711    /// Make sure you record all symbol columns before any other column type.
712    ///
713    /// ```no_run
714    /// # use questdb::Result;
715    /// # use questdb::ingress::{Buffer, SenderBuilder};
716    /// # fn main() -> Result<()> {
717    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
718    /// # let mut buffer = sender.new_buffer();
719    /// # buffer.table("x")?;
720    /// buffer.symbol("col_name", "value")?;
721    /// # Ok(())
722    /// # }
723    /// ```
724    ///
725    /// or
726    ///
727    /// ```no_run
728    /// # use questdb::Result;
729    /// # use questdb::ingress::{Buffer, SenderBuilder};
730    /// # fn main() -> Result<()> {
731    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
732    /// # let mut buffer = sender.new_buffer();
733    /// # buffer.table("x")?;
734    /// let value: String = "value".to_owned();
735    /// buffer.symbol("col_name", value)?;
736    /// # Ok(())
737    /// # }
738    /// ```
739    ///
740    /// or
741    ///
742    /// ```no_run
743    /// # use questdb::Result;
744    /// # use questdb::ingress::{Buffer, SenderBuilder};
745    /// use questdb::ingress::ColumnName;
746    ///
747    /// # fn main() -> Result<()> {
748    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
749    /// # let mut buffer = sender.new_buffer();
750    /// # buffer.table("x")?;
751    /// let col_name = ColumnName::new("col_name")?;
752    /// buffer.symbol(col_name, "value")?;
753    /// # Ok(())
754    /// # }
755    /// ```
756    ///
757    pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
758    where
759        N: TryInto<ColumnName<'a>>,
760        S: AsRef<str>,
761        Error: From<N::Error>,
762    {
763        let name: ColumnName<'a> = name.try_into()?;
764        self.validate_max_name_len(name.name)?;
765        self.check_op(Op::Symbol)?;
766        self.output.push(b',');
767        write_escaped_unquoted(&mut self.output, name.name);
768        self.output.push(b'=');
769        write_escaped_unquoted(&mut self.output, value.as_ref());
770        self.state.op_case = OpCase::SymbolWritten;
771        Ok(self)
772    }
773
774    fn write_column_key<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
775    where
776        N: TryInto<ColumnName<'a>>,
777        Error: From<N::Error>,
778    {
779        let name: ColumnName<'a> = name.try_into()?;
780        self.validate_max_name_len(name.name)?;
781        self.check_op(Op::Column)?;
782        self.output
783            .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
784                b' '
785            } else {
786                b','
787            });
788        write_escaped_unquoted(&mut self.output, name.name);
789        self.output.push(b'=');
790        self.state.op_case = OpCase::ColumnWritten;
791        Ok(self)
792    }
793
794    /// Record a boolean value for the given column.
795    ///
796    /// ```no_run
797    /// # use questdb::Result;
798    /// # use questdb::ingress::{Buffer, SenderBuilder};
799    /// # fn main() -> Result<()> {
800    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
801    /// # let mut buffer = sender.new_buffer();
802    /// # buffer.table("x")?;
803    /// buffer.column_bool("col_name", true)?;
804    /// # Ok(())
805    /// # }
806    /// ```
807    ///
808    /// or
809    ///
810    /// ```no_run
811    /// # use questdb::Result;
812    /// # use questdb::ingress::{Buffer, SenderBuilder};
813    /// use questdb::ingress::ColumnName;
814    ///
815    /// # fn main() -> Result<()> {
816    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
817    /// # let mut buffer = sender.new_buffer();
818    /// # buffer.table("x")?;
819    /// let col_name = ColumnName::new("col_name")?;
820    /// buffer.column_bool(col_name, true)?;
821    /// # Ok(())
822    /// # }
823    /// ```
824    pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> crate::Result<&mut Self>
825    where
826        N: TryInto<ColumnName<'a>>,
827        Error: From<N::Error>,
828    {
829        self.write_column_key(name)?;
830        self.output.push(if value { b't' } else { b'f' });
831        Ok(self)
832    }
833
834    /// Record an integer value for the given column.
835    ///
836    /// ```no_run
837    /// # use questdb::Result;
838    /// # use questdb::ingress::{Buffer, SenderBuilder};
839    /// # fn main() -> Result<()> {
840    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
841    /// # let mut buffer = sender.new_buffer();
842    /// # buffer.table("x")?;
843    /// buffer.column_i64("col_name", 42)?;
844    /// # Ok(())
845    /// # }
846    /// ```
847    ///
848    /// or
849    ///
850    /// ```no_run
851    /// # use questdb::Result;
852    /// # use questdb::ingress::{Buffer, SenderBuilder};
853    /// use questdb::ingress::ColumnName;
854    ///
855    /// # fn main() -> Result<()> {
856    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
857    /// # let mut buffer = sender.new_buffer();
858    /// # buffer.table("x")?;
859    /// let col_name = ColumnName::new("col_name")?;
860    /// buffer.column_i64(col_name, 42)?;
861    /// # Ok(())
862    /// # }
863    /// ```
864    pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> crate::Result<&mut Self>
865    where
866        N: TryInto<ColumnName<'a>>,
867        Error: From<N::Error>,
868    {
869        self.write_column_key(name)?;
870        let mut buf = itoa::Buffer::new();
871        let printed = buf.format(value);
872        self.output.extend_from_slice(printed.as_bytes());
873        self.output.push(b'i');
874        Ok(self)
875    }
876
877    /// Record a floating point value for the given column.
878    ///
879    /// ```no_run
880    /// # use questdb::Result;
881    /// # use questdb::ingress::{Buffer, SenderBuilder};
882    /// # fn main() -> Result<()> {
883    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
884    /// # let mut buffer = sender.new_buffer();
885    /// # buffer.table("x")?;
886    /// buffer.column_f64("col_name", 3.14)?;
887    /// # Ok(())
888    /// # }
889    /// ```
890    ///
891    /// or
892    ///
893    /// ```no_run
894    /// # use questdb::Result;
895    /// # use questdb::ingress::{Buffer, SenderBuilder};
896    /// use questdb::ingress::ColumnName;
897    ///
898    /// # fn main() -> Result<()> {
899    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
900    /// # let mut buffer = sender.new_buffer();
901    /// # buffer.table("x")?;
902    /// let col_name = ColumnName::new("col_name")?;
903    /// buffer.column_f64(col_name, 3.14)?;
904    /// # Ok(())
905    /// # }
906    /// ```
907    pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> crate::Result<&mut Self>
908    where
909        N: TryInto<ColumnName<'a>>,
910        Error: From<N::Error>,
911    {
912        self.write_column_key(name)?;
913        if !matches!(self.protocol_version, ProtocolVersion::V1) {
914            self.output.push(b'=');
915            self.output.push(DOUBLE_BINARY_FORMAT_TYPE);
916            self.output.extend_from_slice(&value.to_le_bytes())
917        } else {
918            let mut ser = F64Serializer::new(value);
919            self.output.extend_from_slice(ser.as_str().as_bytes())
920        }
921        Ok(self)
922    }
923
924    /// Record a string value for the given column.
925    ///
926    /// ```no_run
927    /// # use questdb::Result;
928    /// # use questdb::ingress::{Buffer, SenderBuilder};
929    /// # fn main() -> Result<()> {
930    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
931    /// # let mut buffer = sender.new_buffer();
932    /// # buffer.table("x")?;
933    /// buffer.column_str("col_name", "value")?;
934    /// # Ok(())
935    /// # }
936    /// ```
937    ///
938    /// or
939    ///
940    /// ```no_run
941    /// # use questdb::Result;
942    /// # use questdb::ingress::{Buffer, SenderBuilder};
943    /// # fn main() -> Result<()> {
944    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
945    /// # let mut buffer = sender.new_buffer();
946    /// # buffer.table("x")?;
947    /// let value: String = "value".to_owned();
948    /// buffer.column_str("col_name", value)?;
949    /// # Ok(())
950    /// # }
951    /// ```
952    ///
953    /// or
954    ///
955    /// ```no_run
956    /// # use questdb::Result;
957    /// # use questdb::ingress::{Buffer, SenderBuilder};
958    /// use questdb::ingress::ColumnName;
959    ///
960    /// # fn main() -> Result<()> {
961    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
962    /// # let mut buffer = sender.new_buffer();
963    /// # buffer.table("x")?;
964    /// let col_name = ColumnName::new("col_name")?;
965    /// buffer.column_str(col_name, "value")?;
966    /// # Ok(())
967    /// # }
968    /// ```
969    pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
970    where
971        N: TryInto<ColumnName<'a>>,
972        S: AsRef<str>,
973        Error: From<N::Error>,
974    {
975        self.write_column_key(name)?;
976        write_escaped_quoted(&mut self.output, value.as_ref());
977        Ok(self)
978    }
979
980    /// Record a decimal value for the given column.
981    ///
982    /// ```no_run
983    /// # use questdb::Result;
984    /// # use questdb::ingress::{Buffer, SenderBuilder};
985    /// # fn main() -> Result<()> {
986    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
987    /// # let mut buffer = sender.new_buffer();
988    /// # buffer.table("x")?;
989    /// buffer.column_dec("col_name", "123.45")?;
990    /// # Ok(())
991    /// # }
992    /// ```
993    ///
994    /// When specifying a decimal as a string, use a '.' to separate the whole from the
995    /// fractional parts. For example, "12.20".
996    /// Infinity is encoded as "+Infinity" or "-Infinity", while NaN as "NaN".
997    /// Note that Infinity and NaN values decay to nulls when stored in the database.
998    /// or
999    ///
1000    /// ```no_run
1001    /// # use questdb::Result;
1002    /// # use questdb::ingress::{Buffer, SenderBuilder};
1003    /// use questdb::ingress::ColumnName;
1004    ///
1005    /// # fn main() -> Result<()> {
1006    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1007    /// # let mut buffer = sender.new_buffer();
1008    /// # buffer.table("x")?;
1009    /// let col_name = ColumnName::new("col_name")?;
1010    /// buffer.column_dec(col_name, "123.45")?;
1011    /// # Ok(())
1012    /// # }
1013    /// ```
1014    ///
1015    /// With `rust_decimal` feature enabled:
1016    ///
1017    /// ```no_run
1018    /// # #[cfg(feature = "rust_decimal")]
1019    /// # {
1020    /// # use questdb::Result;
1021    /// # use questdb::ingress::{Buffer, SenderBuilder};
1022    /// use rust_decimal::Decimal;
1023    /// use std::str::FromStr;
1024    ///
1025    /// # fn main() -> Result<()> {
1026    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1027    /// # let mut buffer = sender.new_buffer();
1028    /// # buffer.table("x")?;
1029    /// let value = Decimal::from_str("123.45").unwrap();
1030    /// buffer.column_dec("col_name", &value)?;
1031    /// # Ok(())
1032    /// # }
1033    /// # }
1034    /// ```
1035    ///
1036    /// With `bigdecimal` feature enabled:
1037    ///
1038    /// ```no_run
1039    /// # #[cfg(feature = "bigdecimal")]
1040    /// # {
1041    /// # use questdb::Result;
1042    /// # use questdb::ingress::{Buffer, SenderBuilder};
1043    /// use bigdecimal::BigDecimal;
1044    /// use std::str::FromStr;
1045    ///
1046    /// # fn main() -> Result<()> {
1047    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1048    /// # let mut buffer = sender.new_buffer();
1049    /// # buffer.table("x")?;
1050    /// let value = BigDecimal::from_str("0.123456789012345678901234567890").unwrap();
1051    /// buffer.column_dec("col_name", &value)?;
1052    /// # Ok(())
1053    /// # }
1054    /// # }
1055    /// ```
1056    pub fn column_dec<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
1057    where
1058        N: TryInto<ColumnName<'a>>,
1059        S: TryInto<DecimalView<'a>>,
1060        Error: From<N::Error>,
1061        Error: From<S::Error>,
1062    {
1063        if self.protocol_version < ProtocolVersion::V3 {
1064            return Err(error::fmt!(
1065                ProtocolVersionError,
1066                "Protocol version {} does not support the decimal datatype",
1067                self.protocol_version
1068            ));
1069        }
1070
1071        let value: DecimalView = value.try_into()?;
1072        self.write_column_key(name)?;
1073        value.serialize(&mut self.output);
1074        Ok(self)
1075    }
1076
1077    /// Record a multidimensional array value for the given column.
1078    ///
1079    /// Supports arrays with up to [`MAX_ARRAY_DIMS`] dimensions. The array elements must
1080    /// be of type `f64`, which is currently the only supported data type.
1081    ///
1082    /// **Note**: QuestDB server version 9.0.0 or later is required for array support.
1083    ///
1084    /// # Examples
1085    ///
1086    /// Recording a 2D array using slices:
1087    ///
1088    /// ```no_run
1089    /// # use questdb::Result;
1090    /// # use questdb::ingress::{Buffer, SenderBuilder};
1091    /// # fn main() -> Result<()> {
1092    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1093    /// # let mut buffer = sender.new_buffer();
1094    /// # buffer.table("x")?;
1095    /// let array_2d = vec![vec![1.1, 2.2], vec![3.3, 4.4]];
1096    /// buffer.column_arr("array_col", &array_2d)?;
1097    /// # Ok(())
1098    /// # }
1099    /// ```
1100    ///
1101    /// Recording a 3D array using vectors:
1102    ///
1103    /// ```no_run
1104    /// # use questdb::Result;
1105    /// # use questdb::ingress::{Buffer, ColumnName, SenderBuilder};
1106    /// # fn main() -> Result<()> {
1107    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1108    /// # let mut buffer = sender.new_buffer();
1109    /// # buffer.table("x1")?;
1110    /// let array_3d = vec![vec![vec![42.0; 4]; 3]; 2];
1111    /// let col_name = ColumnName::new("col1")?;
1112    /// buffer.column_arr(col_name, &array_3d)?;
1113    /// # Ok(())
1114    /// # }
1115    /// ```
1116    ///
1117    /// # Errors
1118    ///
1119    /// Returns [`Error`] if:
1120    /// - Array dimensions exceed [`MAX_ARRAY_DIMS`]
1121    /// - Failed to get dimension sizes
1122    /// - Column name validation fails
1123    /// - Protocol version v1 is used (arrays require v2+)
1124    #[allow(private_bounds)]
1125    pub fn column_arr<'a, N, T, D>(&mut self, name: N, view: &T) -> crate::Result<&mut Self>
1126    where
1127        N: TryInto<ColumnName<'a>>,
1128        T: NdArrayView<D>,
1129        D: ArrayElement + ArrayElementSealed,
1130        Error: From<N::Error>,
1131    {
1132        if self.protocol_version < ProtocolVersion::V2 {
1133            return Err(error::fmt!(
1134                ProtocolVersionError,
1135                "Protocol version {} does not support array datatype",
1136                self.protocol_version
1137            ));
1138        }
1139        let ndim = view.ndim();
1140        if ndim == 0 {
1141            return Err(error::fmt!(
1142                ArrayError,
1143                "Zero-dimensional arrays are not supported",
1144            ));
1145        }
1146
1147        // check dimension less equal than max dims
1148        if MAX_ARRAY_DIMS < ndim {
1149            return Err(error::fmt!(
1150                ArrayError,
1151                "Array dimension mismatch: expected at most {} dimensions, but got {}",
1152                MAX_ARRAY_DIMS,
1153                ndim
1154            ));
1155        }
1156
1157        let array_buf_size = check_and_get_array_bytes_size(view)?;
1158        self.write_column_key(name)?;
1159        // binary format flag '='
1160        self.output.push(b'=');
1161        // binary format entity type
1162        self.output.push(ARRAY_BINARY_FORMAT_TYPE);
1163        // ndarr datatype
1164        self.output.push(D::type_tag());
1165        // ndarr dims
1166        self.output.push(ndim as u8);
1167
1168        let dim_header_size = size_of::<u32>() * ndim;
1169        self.output.reserve(dim_header_size + array_buf_size);
1170
1171        for i in 0..ndim {
1172            // ndarr shape
1173            self.output
1174                .extend_from_slice((view.dim(i)? as u32).to_le_bytes().as_slice());
1175        }
1176
1177        let index = self.output.len();
1178        let writeable =
1179            unsafe { from_raw_parts_mut(self.output.as_mut_ptr().add(index), array_buf_size) };
1180
1181        // ndarr data
1182        ndarr::write_array_data(view, writeable, array_buf_size)?;
1183        unsafe { self.output.set_len(array_buf_size + index) }
1184        Ok(self)
1185    }
1186
1187    /// Record a timestamp value for the given column.
1188    ///
1189    /// ```no_run
1190    /// # use questdb::Result;
1191    /// # use questdb::ingress::{Buffer, SenderBuilder};
1192    /// use questdb::ingress::TimestampMicros;
1193    /// # fn main() -> Result<()> {
1194    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1195    /// # let mut buffer = sender.new_buffer();
1196    /// # buffer.table("x")?;
1197    /// buffer.column_ts("col_name", TimestampMicros::now())?;
1198    /// # Ok(())
1199    /// # }
1200    /// ```
1201    ///
1202    /// or
1203    ///
1204    /// ```no_run
1205    /// # use questdb::Result;
1206    /// # use questdb::ingress::{Buffer, SenderBuilder};
1207    /// use questdb::ingress::TimestampMicros;
1208    ///
1209    /// # fn main() -> Result<()> {
1210    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1211    /// # let mut buffer = sender.new_buffer();
1212    /// # buffer.table("x")?;
1213    /// buffer.column_ts("col_name", TimestampMicros::new(1659548204354448))?;
1214    /// # Ok(())
1215    /// # }
1216    /// ```
1217    ///
1218    /// or
1219    ///
1220    /// ```no_run
1221    /// # use questdb::Result;
1222    /// # use questdb::ingress::{Buffer, SenderBuilder};
1223    /// use questdb::ingress::TimestampMicros;
1224    /// use questdb::ingress::ColumnName;
1225    ///
1226    /// # fn main() -> Result<()> {
1227    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1228    /// # let mut buffer = sender.new_buffer();
1229    /// # buffer.table("x")?;
1230    /// let col_name = ColumnName::new("col_name")?;
1231    /// buffer.column_ts(col_name, TimestampMicros::now())?;
1232    /// # Ok(())
1233    /// # }
1234    /// ```
1235    ///
1236    /// or you can also pass in a `TimestampNanos`.
1237    ///
1238    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1239    /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1240    ///
1241    /// This last option requires the `chrono_timestamp` feature.
1242    pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> crate::Result<&mut Self>
1243    where
1244        N: TryInto<ColumnName<'a>>,
1245        T: TryInto<Timestamp>,
1246        Error: From<N::Error>,
1247        Error: From<T::Error>,
1248    {
1249        self.write_column_key(name)?;
1250        let timestamp: Timestamp = value.try_into()?;
1251        let (number, suffix) = match (self.protocol_version, timestamp) {
1252            (ProtocolVersion::V1, _) => {
1253                let timestamp: TimestampMicros = timestamp.try_into()?;
1254                (timestamp.as_i64(), b't')
1255            }
1256            (_, Timestamp::Micros(ts)) => (ts.as_i64(), b't'),
1257            (_, Timestamp::Nanos(ts)) => (ts.as_i64(), b'n'),
1258        };
1259
1260        let mut buf = itoa::Buffer::new();
1261        let printed = buf.format(number);
1262        self.output.extend_from_slice(printed.as_bytes());
1263        self.output.push(suffix);
1264        Ok(self)
1265    }
1266
1267    /// Complete the current row with the designated timestamp. After this call, you can
1268    /// start recording the next row by calling [Buffer::table] again, or  you can send
1269    /// the accumulated batch by calling [Sender::flush] or one of its variants.
1270    ///
1271    /// ```no_run
1272    /// # use questdb::Result;
1273    /// # use questdb::ingress::{Buffer, SenderBuilder};
1274    /// use questdb::ingress::TimestampNanos;
1275    /// # fn main() -> Result<()> {
1276    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1277    /// # let mut buffer = sender.new_buffer();
1278    /// # buffer.table("x")?.symbol("a", "b")?;
1279    /// buffer.at(TimestampNanos::now())?;
1280    /// # Ok(())
1281    /// # }
1282    /// ```
1283    ///
1284    /// or
1285    ///
1286    /// ```no_run
1287    /// # use questdb::Result;
1288    /// # use questdb::ingress::{Buffer, SenderBuilder};
1289    /// use questdb::ingress::TimestampNanos;
1290    ///
1291    /// # fn main() -> Result<()> {
1292    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1293    /// # let mut buffer = sender.new_buffer();
1294    /// # buffer.table("x")?.symbol("a", "b")?;
1295    /// buffer.at(TimestampNanos::new(1659548315647406592))?;
1296    /// # Ok(())
1297    /// # }
1298    /// ```
1299    ///
1300    /// You can also pass in a `TimestampMicros`.
1301    ///
1302    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1303    /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1304    ///
1305    pub fn at<T>(&mut self, timestamp: T) -> crate::Result<()>
1306    where
1307        T: TryInto<Timestamp>,
1308        Error: From<T::Error>,
1309    {
1310        self.check_op(Op::At)?;
1311        let timestamp: Timestamp = timestamp.try_into()?;
1312
1313        let (number, termination) = match (self.protocol_version, timestamp) {
1314            (ProtocolVersion::V1, _) => {
1315                let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
1316                (timestamp?.as_i64(), "\n")
1317            }
1318            (_, Timestamp::Micros(micros)) => (micros.as_i64(), "t\n"),
1319            (_, Timestamp::Nanos(nanos)) => (nanos.as_i64(), "n\n"),
1320        };
1321
1322        if number < 0 {
1323            return Err(error::fmt!(
1324                InvalidTimestamp,
1325                "Timestamp {} is negative. It must be >= 0.",
1326                number
1327            ));
1328        }
1329
1330        let mut buf = itoa::Buffer::new();
1331        let printed = buf.format(number);
1332        self.output.push(b' ');
1333        self.output.extend_from_slice(printed.as_bytes());
1334        self.output.extend_from_slice(termination.as_bytes());
1335        self.state.op_case = OpCase::MayFlushOrTable;
1336        self.state.row_count += 1;
1337        Ok(())
1338    }
1339
1340    /// Complete the current row without providing a timestamp. The QuestDB instance
1341    /// will insert its own timestamp.
1342    ///
1343    /// Letting the server assign the timestamp can be faster since it reliably avoids
1344    /// out-of-order operations in the database for maximum ingestion throughput. However,
1345    /// it removes the ability to deduplicate rows.
1346    ///
1347    /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB
1348    /// server will set the timestamp only after receiving the row. If you're flushing
1349    /// infrequently, the server-assigned timestamp may be significantly behind the
1350    /// time the data was recorded in the buffer.
1351    ///
1352    /// In almost all cases, you should prefer the [Buffer::at] function.
1353    ///
1354    /// After this call, you can start recording the next row by calling [Buffer::table]
1355    /// again, or you can send the accumulated batch by calling [Sender::flush] or one of
1356    /// its variants.
1357    ///
1358    /// ```no_run
1359    /// # use questdb::Result;
1360    /// # use questdb::ingress::{Buffer, SenderBuilder};
1361    /// # fn main() -> Result<()> {
1362    /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1363    /// # let mut buffer = sender.new_buffer();
1364    /// # buffer.table("x")?.symbol("a", "b")?;
1365    /// buffer.at_now()?;
1366    /// # Ok(())
1367    /// # }
1368    /// ```
1369    pub fn at_now(&mut self) -> crate::Result<()> {
1370        self.check_op(Op::At)?;
1371        self.output.push(b'\n');
1372        self.state.op_case = OpCase::MayFlushOrTable;
1373        self.state.row_count += 1;
1374        Ok(())
1375    }
1376}
1377
1378impl Debug for Buffer {
1379    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1380        f.debug_struct("Buffer")
1381            .field("output", &DebugBytes(&self.output))
1382            .field("state", &self.state)
1383            .field("marker", &self.marker)
1384            .field("max_name_len", &self.max_name_len)
1385            .field("protocol_version", &self.protocol_version)
1386            .finish()
1387    }
1388}