questdb/ingress/buffer.rs
1/*******************************************************************************
2 * ___ _ ____ ____
3 * / _ \ _ _ ___ ___| |_| _ \| __ )
4 * | | | | | | |/ _ \/ __| __| | | | _ \
5 * | |_| | |_| | __/\__ \ |_| |_| | |_) |
6 * \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 * Copyright (c) 2014-2019 Appsicle
9 * Copyright (c) 2019-2025 QuestDB
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 *
23 ******************************************************************************/
24
25use crate::ingress::decimal::DecimalView;
26use crate::ingress::ndarr::{ArrayElementSealed, check_and_get_array_bytes_size};
27use crate::ingress::{
28 ARRAY_BINARY_FORMAT_TYPE, ArrayElement, DOUBLE_BINARY_FORMAT_TYPE, DebugBytes, MAX_ARRAY_DIMS,
29 MAX_NAME_LEN_DEFAULT, NdArrayView, ProtocolVersion, Timestamp, TimestampMicros, TimestampNanos,
30 ndarr,
31};
32use crate::{Error, error};
33use std::fmt::{Debug, Formatter};
34use std::num::NonZeroUsize;
35use std::slice::from_raw_parts_mut;
36
37fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut Vec<u8>, s: &str)
38where
39 C: Fn(u8) -> bool,
40 Q: Fn(&mut Vec<u8>),
41{
42 let mut to_escape = 0usize;
43 for b in s.bytes() {
44 if check_escape_fn(b) {
45 to_escape += 1;
46 }
47 }
48
49 quoting_fn(output);
50
51 if to_escape == 0 {
52 // output.push_str(s);
53 output.extend_from_slice(s.as_bytes());
54 } else {
55 let additional = s.len() + to_escape;
56 output.reserve(additional);
57 let mut index = output.len();
58 unsafe { output.set_len(index + additional) };
59 for b in s.bytes() {
60 if check_escape_fn(b) {
61 unsafe {
62 *output.get_unchecked_mut(index) = b'\\';
63 }
64 index += 1;
65 }
66
67 unsafe {
68 *output.get_unchecked_mut(index) = b;
69 }
70 index += 1;
71 }
72 }
73
74 quoting_fn(output);
75}
76
77fn must_escape_unquoted(c: u8) -> bool {
78 matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
79}
80
81fn must_escape_quoted(c: u8) -> bool {
82 matches!(c, b'\n' | b'\r' | b'"' | b'\\')
83}
84
85fn write_escaped_unquoted(output: &mut Vec<u8>, s: &str) {
86 write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
87}
88
89fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
90 write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
91}
92
93pub(crate) struct F64Serializer {
94 buf: ryu::Buffer,
95 n: f64,
96}
97
98impl F64Serializer {
99 pub(crate) fn new(n: f64) -> Self {
100 F64Serializer {
101 buf: ryu::Buffer::new(),
102 n,
103 }
104 }
105
106 // This function was taken and customized from the ryu crate.
107 #[cold]
108 fn format_nonfinite(&self) -> &'static str {
109 const MANTISSA_MASK: u64 = 0x000fffffffffffff;
110 const SIGN_MASK: u64 = 0x8000000000000000;
111 let bits = self.n.to_bits();
112 if bits & MANTISSA_MASK != 0 {
113 "NaN"
114 } else if bits & SIGN_MASK != 0 {
115 "-Infinity"
116 } else {
117 "Infinity"
118 }
119 }
120
121 pub(crate) fn as_str(&mut self) -> &str {
122 if self.n.is_finite() {
123 self.buf.format_finite(self.n)
124 } else {
125 self.format_nonfinite()
126 }
127 }
128}
129
130#[derive(Debug, Copy, Clone)]
131enum Op {
132 Table = 1,
133 Symbol = 1 << 1,
134 Column = 1 << 2,
135 At = 1 << 3,
136 Flush = 1 << 4,
137}
138
139impl Op {
140 fn descr(self) -> &'static str {
141 match self {
142 Op::Table => "table",
143 Op::Symbol => "symbol",
144 Op::Column => "column",
145 Op::At => "at",
146 Op::Flush => "flush",
147 }
148 }
149}
150
151#[derive(Debug, Copy, Clone, PartialEq)]
152enum OpCase {
153 Init = Op::Table as isize,
154 TableWritten = Op::Symbol as isize | Op::Column as isize,
155 SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
156 ColumnWritten = Op::Column as isize | Op::At as isize,
157 MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
158}
159
160impl OpCase {
161 fn next_op_descr(self) -> &'static str {
162 match self {
163 OpCase::Init => "should have called `table` instead",
164 OpCase::TableWritten => "should have called `symbol` or `column` instead",
165 OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
166 OpCase::ColumnWritten => "should have called `column` or `at` instead",
167 OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
168 }
169 }
170}
171
172// IMPORTANT: This struct MUST remain `Copy` to ensure that
173// there are no heap allocations when performing marker operations.
174#[derive(Debug, Clone, Copy)]
175struct BufferState {
176 op_case: OpCase,
177 row_count: usize,
178 first_table_len: Option<NonZeroUsize>,
179 transactional: bool,
180}
181
182impl BufferState {
183 fn new() -> Self {
184 Self {
185 op_case: OpCase::Init,
186 row_count: 0,
187 first_table_len: None,
188 transactional: true,
189 }
190 }
191}
192
193/// A validated table name.
194///
195/// This type simply wraps a `&str`.
196///
197/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method,
198/// it doesn't have to validate it again. This saves CPU cycles.
199#[derive(Clone, Copy)]
200pub struct TableName<'a> {
201 name: &'a str,
202}
203
204impl<'a> TableName<'a> {
205 /// Construct a validated table name.
206 pub fn new(name: &'a str) -> crate::Result<Self> {
207 if name.is_empty() {
208 return Err(error::fmt!(
209 InvalidName,
210 "Table names must have a non-zero length."
211 ));
212 }
213
214 let mut prev = '\0';
215 for (index, c) in name.chars().enumerate() {
216 match c {
217 '.' => {
218 if index == 0 || index == name.len() - 1 || prev == '.' {
219 return Err(error::fmt!(
220 InvalidName,
221 concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
222 name,
223 index
224 ));
225 }
226 }
227 '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
228 | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
229 | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
230 | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
231 return Err(error::fmt!(
232 InvalidName,
233 concat!(
234 "Bad string {:?}: ",
235 "Table names can't contain ",
236 "a {:?} character, which was found at ",
237 "byte position {}."
238 ),
239 name,
240 c,
241 index
242 ));
243 }
244 '\u{feff}' => {
245 // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
246 // aka UTF-8 BOM if it appears anywhere in the string.
247 return Err(error::fmt!(
248 InvalidName,
249 concat!(
250 "Bad string {:?}: ",
251 "Table names can't contain ",
252 "a UTF-8 BOM character, which was found at ",
253 "byte position {}."
254 ),
255 name,
256 index
257 ));
258 }
259 _ => (),
260 }
261 prev = c;
262 }
263
264 Ok(Self { name })
265 }
266
267 /// Construct a table name without validating it.
268 ///
269 /// This breaks API encapsulation and is only intended for use
270 /// when the string was already previously validated.
271 ///
272 /// The QuestDB server will reject an invalid table name.
273 pub fn new_unchecked(name: &'a str) -> Self {
274 Self { name }
275 }
276}
277
278impl<'a> TryFrom<&'a str> for TableName<'a> {
279 type Error = Error;
280
281 fn try_from(name: &'a str) -> crate::Result<Self> {
282 Self::new(name)
283 }
284}
285
286impl AsRef<str> for TableName<'_> {
287 fn as_ref(&self) -> &str {
288 self.name
289 }
290}
291
292/// A validated column name.
293///
294/// This type simply wraps a `&str`.
295///
296/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method,
297/// it doesn't have to validate it again. This saves CPU cycles.
298#[derive(Clone, Copy)]
299pub struct ColumnName<'a> {
300 name: &'a str,
301}
302
303impl<'a> ColumnName<'a> {
304 /// Construct a validated table name.
305 pub fn new(name: &'a str) -> crate::Result<Self> {
306 if name.is_empty() {
307 return Err(error::fmt!(
308 InvalidName,
309 "Column names must have a non-zero length."
310 ));
311 }
312
313 for (index, c) in name.chars().enumerate() {
314 match c {
315 '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
316 | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
317 | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
318 | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
319 return Err(error::fmt!(
320 InvalidName,
321 concat!(
322 "Bad string {:?}: ",
323 "Column names can't contain ",
324 "a {:?} character, which was found at ",
325 "byte position {}."
326 ),
327 name,
328 c,
329 index
330 ));
331 }
332 '\u{FEFF}' => {
333 // Reject Unicode char 'ZERO WIDTH NO-BREAK SPACE',
334 // aka UTF-8 BOM if it appears anywhere in the string.
335 return Err(error::fmt!(
336 InvalidName,
337 concat!(
338 "Bad string {:?}: ",
339 "Column names can't contain ",
340 "a UTF-8 BOM character, which was found at ",
341 "byte position {}."
342 ),
343 name,
344 index
345 ));
346 }
347 _ => (),
348 }
349 }
350
351 Ok(Self { name })
352 }
353
354 /// Construct a column name without validating it.
355 ///
356 /// This breaks API encapsulation and is only intended for use
357 /// when the string was already previously validated.
358 ///
359 /// The QuestDB server will reject an invalid column name.
360 pub fn new_unchecked(name: &'a str) -> Self {
361 Self { name }
362 }
363}
364
365impl<'a> TryFrom<&'a str> for ColumnName<'a> {
366 type Error = Error;
367
368 fn try_from(name: &'a str) -> crate::Result<Self> {
369 Self::new(name)
370 }
371}
372
373impl AsRef<str> for ColumnName<'_> {
374 fn as_ref(&self) -> &str {
375 self.name
376 }
377}
378
379/// A reusable buffer to prepare a batch of ILP messages.
380///
381/// # Example
382///
383/// ```no_run
384/// # use questdb::Result;
385/// # use questdb::ingress::SenderBuilder;
386///
387/// # fn main() -> Result<()> {
388/// # let mut sender = SenderBuilder::from_conf("http::addr=localhost:9000;")?.build()?;
389/// # use questdb::Result;
390/// use questdb::ingress::{Buffer, TimestampMicros, TimestampNanos};
391/// let mut buffer = sender.new_buffer();
392///
393/// // first row
394/// buffer
395/// .table("table1")?
396/// .symbol("bar", "baz")?
397/// .column_bool("a", false)?
398/// .column_i64("b", 42)?
399/// .column_f64("c", 3.14)?
400/// .column_str("d", "hello")?
401/// .column_ts("e", TimestampMicros::now())?
402/// .at(TimestampNanos::now())?;
403///
404/// // second row
405/// buffer
406/// .table("table2")?
407/// .symbol("foo", "bar")?
408/// .at(TimestampNanos::now())?;
409/// # Ok(())
410/// # }
411/// ```
412///
413/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush).
414///
415/// # Sequential Coupling
416/// The Buffer API is sequentially coupled:
417/// * A row always starts with [`table`](Buffer::table).
418/// * A row must contain at least one [`symbol`](Buffer::symbol) or
419/// column (
420/// [`column_bool`](Buffer::column_bool),
421/// [`column_i64`](Buffer::column_i64),
422/// [`column_f64`](Buffer::column_f64),
423/// [`column_str`](Buffer::column_str),
424/// [`column_arr`](Buffer::column_arr),
425/// [`column_ts`](Buffer::column_ts)).
426/// * Symbols must appear before columns.
427/// * A row must be terminated with either [`at`](Buffer::at) or
428/// [`at_now`](Buffer::at_now).
429///
430/// This diagram visualizes the sequence:
431///
432/// <img src="https://raw.githubusercontent.com/questdb/c-questdb-client/main/api_seq/seq.svg">
433///
434/// # Buffer method calls, Serialized ILP types and QuestDB types
435///
436/// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) |
437/// |---------------|--------------------------------------------------------------|
438/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) |
439/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) |
440/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) |
441/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) |
442/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) |
443/// | [`column_arr`](Buffer::column_arr) | [`ARRAY`](https://questdb.io/docs/reference/api/ilp/columnset-types#array) |
444/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) |
445///
446/// QuestDB supports both `STRING` and `SYMBOL` column types.
447///
448/// To understand the difference, refer to the
449/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell,
450/// symbols are interned strings, most suitable for identifiers that are repeated many
451/// times throughout the column. They offer an advantage in storage space and query
452/// performance.
453///
454/// # Inserting NULL values
455///
456/// To insert a NULL value, skip the symbol or column for that row.
457///
458/// # Recovering from validation errors
459///
460/// If you want to recover from potential validation errors, call
461/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state,
462/// append as many rows or parts of rows as you like, and then call
463/// [`buffer.clear_marker()`](Buffer::clear_marker) on success.
464///
465/// If there was an error in one of the rows, use
466/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the
467/// marked last known good state.
468///
469#[derive(Clone)]
470pub struct Buffer {
471 output: Vec<u8>,
472 state: BufferState,
473 marker: Option<(usize, BufferState)>,
474 max_name_len: usize,
475 protocol_version: ProtocolVersion,
476}
477
478impl Buffer {
479 /// Creates a new [`Buffer`] with default parameters.
480 ///
481 /// - Uses the specified protocol version
482 /// - Sets maximum name length to **127 characters** (QuestDB server default)
483 ///
484 /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
485 /// and [`Sender::max_name_len`] is 127.
486 ///
487 /// For custom name lengths, use [`Self::with_max_name_len`]
488 pub fn new(protocol_version: ProtocolVersion) -> Self {
489 Self::with_max_name_len(protocol_version, MAX_NAME_LEN_DEFAULT)
490 }
491
492 /// Creates a new [`Buffer`] with a custom maximum name length.
493 ///
494 /// - `max_name_len`: Maximum allowed length for table/column names, match
495 /// your QuestDB server's `cairo.max.file.name.length` configuration
496 /// - `protocol_version`: Protocol version to use
497 ///
498 /// This is equivalent to [`Sender::new_buffer`] when using the [`Sender::protocol_version`]
499 /// and [`Sender::max_name_len`].
500 ///
501 /// For the default max name length limit (127), use [`Self::new`].
502 pub fn with_max_name_len(protocol_version: ProtocolVersion, max_name_len: usize) -> Self {
503 Self {
504 output: Vec::new(),
505 state: BufferState::new(),
506 marker: None,
507 max_name_len,
508 protocol_version,
509 }
510 }
511
512 pub fn protocol_version(&self) -> ProtocolVersion {
513 self.protocol_version
514 }
515
516 /// Pre-allocate to ensure the buffer has enough capacity for at least the
517 /// specified additional byte count. This may be rounded up.
518 /// This does not allocate if such additional capacity is already satisfied.
519 /// See: `capacity`.
520 pub fn reserve(&mut self, additional: usize) {
521 self.output.reserve(additional);
522 }
523
524 /// The number of bytes accumulated in the buffer.
525 pub fn len(&self) -> usize {
526 self.output.len()
527 }
528
529 /// The number of rows accumulated in the buffer.
530 pub fn row_count(&self) -> usize {
531 self.state.row_count
532 }
533
534 /// Tells whether the buffer is transactional. It is transactional iff it contains
535 /// data for at most one table. Additionally, you must send the buffer over HTTP to
536 /// get transactional behavior.
537 pub fn transactional(&self) -> bool {
538 self.state.transactional
539 }
540
541 pub fn is_empty(&self) -> bool {
542 self.output.is_empty()
543 }
544
545 /// The total number of bytes the buffer can hold before it needs to resize.
546 pub fn capacity(&self) -> usize {
547 self.output.capacity()
548 }
549
550 pub fn as_bytes(&self) -> &[u8] {
551 &self.output
552 }
553
554 /// Mark a rewind point.
555 /// This allows undoing accumulated changes to the buffer for one or more
556 /// rows by calling [`rewind_to_marker`](Buffer::rewind_to_marker).
557 /// Any previous marker will be discarded.
558 /// Once the marker is no longer needed, call
559 /// [`clear_marker`](Buffer::clear_marker).
560 pub fn set_marker(&mut self) -> crate::Result<()> {
561 if (self.state.op_case as isize & Op::Table as isize) == 0 {
562 return Err(error::fmt!(
563 InvalidApiCall,
564 concat!(
565 "Can't set the marker whilst constructing a line. ",
566 "A marker may only be set on an empty buffer or after ",
567 "`at` or `at_now` is called."
568 )
569 ));
570 }
571 self.marker = Some((self.output.len(), self.state));
572 Ok(())
573 }
574
575 /// Undo all changes since the last [`set_marker`](Buffer::set_marker)
576 /// call.
577 ///
578 /// As a side effect, this also clears the marker.
579 pub fn rewind_to_marker(&mut self) -> crate::Result<()> {
580 if let Some((position, state)) = self.marker.take() {
581 self.output.truncate(position);
582 self.state = state;
583 Ok(())
584 } else {
585 Err(error::fmt!(
586 InvalidApiCall,
587 "Can't rewind to the marker: No marker set."
588 ))
589 }
590 }
591
592 /// Discard any marker as may have been set by
593 /// [`set_marker`](Buffer::set_marker).
594 ///
595 /// Idempotent.
596 pub fn clear_marker(&mut self) {
597 self.marker = None;
598 }
599
600 /// Reset the buffer and clear contents whilst retaining
601 /// [`capacity`](Buffer::capacity).
602 pub fn clear(&mut self) {
603 self.output.clear();
604 self.state = BufferState::new();
605 self.marker = None;
606 }
607
608 /// Check if the next API operation is allowed as per the OP case state machine.
609 #[inline(always)]
610 fn check_op(&self, op: Op) -> crate::Result<()> {
611 if (self.state.op_case as isize & op as isize) > 0 {
612 Ok(())
613 } else {
614 Err(error::fmt!(
615 InvalidApiCall,
616 "State error: Bad call to `{}`, {}.",
617 op.descr(),
618 self.state.op_case.next_op_descr()
619 ))
620 }
621 }
622
623 /// Checks if this buffer is ready to be flushed to a sender via one of the
624 /// [`Sender::flush`] functions. An [`Ok`] value indicates that the buffer
625 /// is ready to be flushed via a [`Sender`] while an [`Err`] will contain a
626 /// message indicating why this [`Buffer`] cannot be flushed at the moment.
627 #[inline(always)]
628 pub fn check_can_flush(&self) -> crate::Result<()> {
629 self.check_op(Op::Flush)
630 }
631
632 #[inline(always)]
633 fn validate_max_name_len(&self, name: &str) -> crate::Result<()> {
634 if name.len() > self.max_name_len {
635 return Err(error::fmt!(
636 InvalidName,
637 "Bad name: {:?}: Too long (max {} characters)",
638 name,
639 self.max_name_len
640 ));
641 }
642 Ok(())
643 }
644
645 /// Begin recording a new row for the given table.
646 ///
647 /// ```no_run
648 /// # use questdb::Result;
649 /// # use questdb::ingress::{Buffer, SenderBuilder};
650 /// # fn main() -> Result<()> {
651 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
652 /// # let mut buffer = sender.new_buffer();
653 /// buffer.table("table_name")?;
654 /// # Ok(())
655 /// # }
656 /// ```
657 ///
658 /// or
659 ///
660 /// ```no_run
661 /// # use questdb::Result;
662 /// # use questdb::ingress::{Buffer, SenderBuilder};
663 /// use questdb::ingress::TableName;
664 ///
665 /// # fn main() -> Result<()> {
666 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
667 /// # let mut buffer = sender.new_buffer();
668 /// let table_name = TableName::new("table_name")?;
669 /// buffer.table(table_name)?;
670 /// # Ok(())
671 /// # }
672 /// ```
673 pub fn table<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
674 where
675 N: TryInto<TableName<'a>>,
676 Error: From<N::Error>,
677 {
678 let name: TableName<'a> = name.try_into()?;
679 self.validate_max_name_len(name.name)?;
680 self.check_op(Op::Table)?;
681 let table_begin = self.output.len();
682 write_escaped_unquoted(&mut self.output, name.name);
683 let table_end = self.output.len();
684 self.state.op_case = OpCase::TableWritten;
685
686 // A buffer stops being transactional if it targets multiple tables.
687 if let Some(first_table_len) = &self.state.first_table_len {
688 let first_table = &self.output[0..first_table_len.get()];
689 let this_table = &self.output[table_begin..table_end];
690 if first_table != this_table {
691 self.state.transactional = false;
692 }
693 } else {
694 debug_assert!(table_begin == 0);
695
696 // This is a bit confusing, so worth explaining:
697 // `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
698 // but we know that `table_end` is never 0 here, we just need an option type
699 // anyway, so we don't bother unwrapping it to then wrap it again.
700 let first_table_len = NonZeroUsize::new(table_end);
701
702 // Instead we just assert that it's `Some`.
703 debug_assert!(first_table_len.is_some());
704
705 self.state.first_table_len = first_table_len;
706 }
707 Ok(self)
708 }
709
710 /// Record a symbol for the given column.
711 /// Make sure you record all symbol columns before any other column type.
712 ///
713 /// ```no_run
714 /// # use questdb::Result;
715 /// # use questdb::ingress::{Buffer, SenderBuilder};
716 /// # fn main() -> Result<()> {
717 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
718 /// # let mut buffer = sender.new_buffer();
719 /// # buffer.table("x")?;
720 /// buffer.symbol("col_name", "value")?;
721 /// # Ok(())
722 /// # }
723 /// ```
724 ///
725 /// or
726 ///
727 /// ```no_run
728 /// # use questdb::Result;
729 /// # use questdb::ingress::{Buffer, SenderBuilder};
730 /// # fn main() -> Result<()> {
731 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
732 /// # let mut buffer = sender.new_buffer();
733 /// # buffer.table("x")?;
734 /// let value: String = "value".to_owned();
735 /// buffer.symbol("col_name", value)?;
736 /// # Ok(())
737 /// # }
738 /// ```
739 ///
740 /// or
741 ///
742 /// ```no_run
743 /// # use questdb::Result;
744 /// # use questdb::ingress::{Buffer, SenderBuilder};
745 /// use questdb::ingress::ColumnName;
746 ///
747 /// # fn main() -> Result<()> {
748 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
749 /// # let mut buffer = sender.new_buffer();
750 /// # buffer.table("x")?;
751 /// let col_name = ColumnName::new("col_name")?;
752 /// buffer.symbol(col_name, "value")?;
753 /// # Ok(())
754 /// # }
755 /// ```
756 ///
757 pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
758 where
759 N: TryInto<ColumnName<'a>>,
760 S: AsRef<str>,
761 Error: From<N::Error>,
762 {
763 let name: ColumnName<'a> = name.try_into()?;
764 self.validate_max_name_len(name.name)?;
765 self.check_op(Op::Symbol)?;
766 self.output.push(b',');
767 write_escaped_unquoted(&mut self.output, name.name);
768 self.output.push(b'=');
769 write_escaped_unquoted(&mut self.output, value.as_ref());
770 self.state.op_case = OpCase::SymbolWritten;
771 Ok(self)
772 }
773
774 fn write_column_key<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
775 where
776 N: TryInto<ColumnName<'a>>,
777 Error: From<N::Error>,
778 {
779 let name: ColumnName<'a> = name.try_into()?;
780 self.validate_max_name_len(name.name)?;
781 self.check_op(Op::Column)?;
782 self.output
783 .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
784 b' '
785 } else {
786 b','
787 });
788 write_escaped_unquoted(&mut self.output, name.name);
789 self.output.push(b'=');
790 self.state.op_case = OpCase::ColumnWritten;
791 Ok(self)
792 }
793
794 /// Record a boolean value for the given column.
795 ///
796 /// ```no_run
797 /// # use questdb::Result;
798 /// # use questdb::ingress::{Buffer, SenderBuilder};
799 /// # fn main() -> Result<()> {
800 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
801 /// # let mut buffer = sender.new_buffer();
802 /// # buffer.table("x")?;
803 /// buffer.column_bool("col_name", true)?;
804 /// # Ok(())
805 /// # }
806 /// ```
807 ///
808 /// or
809 ///
810 /// ```no_run
811 /// # use questdb::Result;
812 /// # use questdb::ingress::{Buffer, SenderBuilder};
813 /// use questdb::ingress::ColumnName;
814 ///
815 /// # fn main() -> Result<()> {
816 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
817 /// # let mut buffer = sender.new_buffer();
818 /// # buffer.table("x")?;
819 /// let col_name = ColumnName::new("col_name")?;
820 /// buffer.column_bool(col_name, true)?;
821 /// # Ok(())
822 /// # }
823 /// ```
824 pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> crate::Result<&mut Self>
825 where
826 N: TryInto<ColumnName<'a>>,
827 Error: From<N::Error>,
828 {
829 self.write_column_key(name)?;
830 self.output.push(if value { b't' } else { b'f' });
831 Ok(self)
832 }
833
834 /// Record an integer value for the given column.
835 ///
836 /// ```no_run
837 /// # use questdb::Result;
838 /// # use questdb::ingress::{Buffer, SenderBuilder};
839 /// # fn main() -> Result<()> {
840 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
841 /// # let mut buffer = sender.new_buffer();
842 /// # buffer.table("x")?;
843 /// buffer.column_i64("col_name", 42)?;
844 /// # Ok(())
845 /// # }
846 /// ```
847 ///
848 /// or
849 ///
850 /// ```no_run
851 /// # use questdb::Result;
852 /// # use questdb::ingress::{Buffer, SenderBuilder};
853 /// use questdb::ingress::ColumnName;
854 ///
855 /// # fn main() -> Result<()> {
856 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
857 /// # let mut buffer = sender.new_buffer();
858 /// # buffer.table("x")?;
859 /// let col_name = ColumnName::new("col_name")?;
860 /// buffer.column_i64(col_name, 42)?;
861 /// # Ok(())
862 /// # }
863 /// ```
864 pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> crate::Result<&mut Self>
865 where
866 N: TryInto<ColumnName<'a>>,
867 Error: From<N::Error>,
868 {
869 self.write_column_key(name)?;
870 let mut buf = itoa::Buffer::new();
871 let printed = buf.format(value);
872 self.output.extend_from_slice(printed.as_bytes());
873 self.output.push(b'i');
874 Ok(self)
875 }
876
877 /// Record a floating point value for the given column.
878 ///
879 /// ```no_run
880 /// # use questdb::Result;
881 /// # use questdb::ingress::{Buffer, SenderBuilder};
882 /// # fn main() -> Result<()> {
883 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
884 /// # let mut buffer = sender.new_buffer();
885 /// # buffer.table("x")?;
886 /// buffer.column_f64("col_name", 3.14)?;
887 /// # Ok(())
888 /// # }
889 /// ```
890 ///
891 /// or
892 ///
893 /// ```no_run
894 /// # use questdb::Result;
895 /// # use questdb::ingress::{Buffer, SenderBuilder};
896 /// use questdb::ingress::ColumnName;
897 ///
898 /// # fn main() -> Result<()> {
899 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
900 /// # let mut buffer = sender.new_buffer();
901 /// # buffer.table("x")?;
902 /// let col_name = ColumnName::new("col_name")?;
903 /// buffer.column_f64(col_name, 3.14)?;
904 /// # Ok(())
905 /// # }
906 /// ```
907 pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> crate::Result<&mut Self>
908 where
909 N: TryInto<ColumnName<'a>>,
910 Error: From<N::Error>,
911 {
912 self.write_column_key(name)?;
913 if !matches!(self.protocol_version, ProtocolVersion::V1) {
914 self.output.push(b'=');
915 self.output.push(DOUBLE_BINARY_FORMAT_TYPE);
916 self.output.extend_from_slice(&value.to_le_bytes())
917 } else {
918 let mut ser = F64Serializer::new(value);
919 self.output.extend_from_slice(ser.as_str().as_bytes())
920 }
921 Ok(self)
922 }
923
924 /// Record a string value for the given column.
925 ///
926 /// ```no_run
927 /// # use questdb::Result;
928 /// # use questdb::ingress::{Buffer, SenderBuilder};
929 /// # fn main() -> Result<()> {
930 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
931 /// # let mut buffer = sender.new_buffer();
932 /// # buffer.table("x")?;
933 /// buffer.column_str("col_name", "value")?;
934 /// # Ok(())
935 /// # }
936 /// ```
937 ///
938 /// or
939 ///
940 /// ```no_run
941 /// # use questdb::Result;
942 /// # use questdb::ingress::{Buffer, SenderBuilder};
943 /// # fn main() -> Result<()> {
944 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
945 /// # let mut buffer = sender.new_buffer();
946 /// # buffer.table("x")?;
947 /// let value: String = "value".to_owned();
948 /// buffer.column_str("col_name", value)?;
949 /// # Ok(())
950 /// # }
951 /// ```
952 ///
953 /// or
954 ///
955 /// ```no_run
956 /// # use questdb::Result;
957 /// # use questdb::ingress::{Buffer, SenderBuilder};
958 /// use questdb::ingress::ColumnName;
959 ///
960 /// # fn main() -> Result<()> {
961 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
962 /// # let mut buffer = sender.new_buffer();
963 /// # buffer.table("x")?;
964 /// let col_name = ColumnName::new("col_name")?;
965 /// buffer.column_str(col_name, "value")?;
966 /// # Ok(())
967 /// # }
968 /// ```
969 pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
970 where
971 N: TryInto<ColumnName<'a>>,
972 S: AsRef<str>,
973 Error: From<N::Error>,
974 {
975 self.write_column_key(name)?;
976 write_escaped_quoted(&mut self.output, value.as_ref());
977 Ok(self)
978 }
979
980 /// Record a decimal value for the given column.
981 ///
982 /// ```no_run
983 /// # use questdb::Result;
984 /// # use questdb::ingress::{Buffer, SenderBuilder};
985 /// # fn main() -> Result<()> {
986 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
987 /// # let mut buffer = sender.new_buffer();
988 /// # buffer.table("x")?;
989 /// buffer.column_dec("col_name", "123.45")?;
990 /// # Ok(())
991 /// # }
992 /// ```
993 ///
994 /// When specifying a decimal as a string, use a '.' to separate the whole from the
995 /// fractional parts. For example, "12.20".
996 /// Infinity is encoded as "+Infinity" or "-Infinity", while NaN as "NaN".
997 /// Note that Infinity and NaN values decay to nulls when stored in the database.
998 /// or
999 ///
1000 /// ```no_run
1001 /// # use questdb::Result;
1002 /// # use questdb::ingress::{Buffer, SenderBuilder};
1003 /// use questdb::ingress::ColumnName;
1004 ///
1005 /// # fn main() -> Result<()> {
1006 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1007 /// # let mut buffer = sender.new_buffer();
1008 /// # buffer.table("x")?;
1009 /// let col_name = ColumnName::new("col_name")?;
1010 /// buffer.column_dec(col_name, "123.45")?;
1011 /// # Ok(())
1012 /// # }
1013 /// ```
1014 ///
1015 /// With `rust_decimal` feature enabled:
1016 ///
1017 /// ```no_run
1018 /// # #[cfg(feature = "rust_decimal")]
1019 /// # {
1020 /// # use questdb::Result;
1021 /// # use questdb::ingress::{Buffer, SenderBuilder};
1022 /// use rust_decimal::Decimal;
1023 /// use std::str::FromStr;
1024 ///
1025 /// # fn main() -> Result<()> {
1026 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1027 /// # let mut buffer = sender.new_buffer();
1028 /// # buffer.table("x")?;
1029 /// let value = Decimal::from_str("123.45").unwrap();
1030 /// buffer.column_dec("col_name", &value)?;
1031 /// # Ok(())
1032 /// # }
1033 /// # }
1034 /// ```
1035 ///
1036 /// With `bigdecimal` feature enabled:
1037 ///
1038 /// ```no_run
1039 /// # #[cfg(feature = "bigdecimal")]
1040 /// # {
1041 /// # use questdb::Result;
1042 /// # use questdb::ingress::{Buffer, SenderBuilder};
1043 /// use bigdecimal::BigDecimal;
1044 /// use std::str::FromStr;
1045 ///
1046 /// # fn main() -> Result<()> {
1047 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1048 /// # let mut buffer = sender.new_buffer();
1049 /// # buffer.table("x")?;
1050 /// let value = BigDecimal::from_str("0.123456789012345678901234567890").unwrap();
1051 /// buffer.column_dec("col_name", &value)?;
1052 /// # Ok(())
1053 /// # }
1054 /// # }
1055 /// ```
1056 pub fn column_dec<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
1057 where
1058 N: TryInto<ColumnName<'a>>,
1059 S: TryInto<DecimalView<'a>>,
1060 Error: From<N::Error>,
1061 Error: From<S::Error>,
1062 {
1063 if self.protocol_version < ProtocolVersion::V3 {
1064 return Err(error::fmt!(
1065 ProtocolVersionError,
1066 "Protocol version {} does not support the decimal datatype",
1067 self.protocol_version
1068 ));
1069 }
1070
1071 let value: DecimalView = value.try_into()?;
1072 self.write_column_key(name)?;
1073 value.serialize(&mut self.output);
1074 Ok(self)
1075 }
1076
1077 /// Record a multidimensional array value for the given column.
1078 ///
1079 /// Supports arrays with up to [`MAX_ARRAY_DIMS`] dimensions. The array elements must
1080 /// be of type `f64`, which is currently the only supported data type.
1081 ///
1082 /// **Note**: QuestDB server version 9.0.0 or later is required for array support.
1083 ///
1084 /// # Examples
1085 ///
1086 /// Recording a 2D array using slices:
1087 ///
1088 /// ```no_run
1089 /// # use questdb::Result;
1090 /// # use questdb::ingress::{Buffer, SenderBuilder};
1091 /// # fn main() -> Result<()> {
1092 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1093 /// # let mut buffer = sender.new_buffer();
1094 /// # buffer.table("x")?;
1095 /// let array_2d = vec![vec![1.1, 2.2], vec![3.3, 4.4]];
1096 /// buffer.column_arr("array_col", &array_2d)?;
1097 /// # Ok(())
1098 /// # }
1099 /// ```
1100 ///
1101 /// Recording a 3D array using vectors:
1102 ///
1103 /// ```no_run
1104 /// # use questdb::Result;
1105 /// # use questdb::ingress::{Buffer, ColumnName, SenderBuilder};
1106 /// # fn main() -> Result<()> {
1107 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1108 /// # let mut buffer = sender.new_buffer();
1109 /// # buffer.table("x1")?;
1110 /// let array_3d = vec![vec![vec![42.0; 4]; 3]; 2];
1111 /// let col_name = ColumnName::new("col1")?;
1112 /// buffer.column_arr(col_name, &array_3d)?;
1113 /// # Ok(())
1114 /// # }
1115 /// ```
1116 ///
1117 /// # Errors
1118 ///
1119 /// Returns [`Error`] if:
1120 /// - Array dimensions exceed [`MAX_ARRAY_DIMS`]
1121 /// - Failed to get dimension sizes
1122 /// - Column name validation fails
1123 /// - Protocol version v1 is used (arrays require v2+)
1124 #[allow(private_bounds)]
1125 pub fn column_arr<'a, N, T, D>(&mut self, name: N, view: &T) -> crate::Result<&mut Self>
1126 where
1127 N: TryInto<ColumnName<'a>>,
1128 T: NdArrayView<D>,
1129 D: ArrayElement + ArrayElementSealed,
1130 Error: From<N::Error>,
1131 {
1132 if self.protocol_version < ProtocolVersion::V2 {
1133 return Err(error::fmt!(
1134 ProtocolVersionError,
1135 "Protocol version {} does not support array datatype",
1136 self.protocol_version
1137 ));
1138 }
1139 let ndim = view.ndim();
1140 if ndim == 0 {
1141 return Err(error::fmt!(
1142 ArrayError,
1143 "Zero-dimensional arrays are not supported",
1144 ));
1145 }
1146
1147 // check dimension less equal than max dims
1148 if MAX_ARRAY_DIMS < ndim {
1149 return Err(error::fmt!(
1150 ArrayError,
1151 "Array dimension mismatch: expected at most {} dimensions, but got {}",
1152 MAX_ARRAY_DIMS,
1153 ndim
1154 ));
1155 }
1156
1157 let array_buf_size = check_and_get_array_bytes_size(view)?;
1158 self.write_column_key(name)?;
1159 // binary format flag '='
1160 self.output.push(b'=');
1161 // binary format entity type
1162 self.output.push(ARRAY_BINARY_FORMAT_TYPE);
1163 // ndarr datatype
1164 self.output.push(D::type_tag());
1165 // ndarr dims
1166 self.output.push(ndim as u8);
1167
1168 let dim_header_size = size_of::<u32>() * ndim;
1169 self.output.reserve(dim_header_size + array_buf_size);
1170
1171 for i in 0..ndim {
1172 // ndarr shape
1173 self.output
1174 .extend_from_slice((view.dim(i)? as u32).to_le_bytes().as_slice());
1175 }
1176
1177 let index = self.output.len();
1178 let writeable =
1179 unsafe { from_raw_parts_mut(self.output.as_mut_ptr().add(index), array_buf_size) };
1180
1181 // ndarr data
1182 ndarr::write_array_data(view, writeable, array_buf_size)?;
1183 unsafe { self.output.set_len(array_buf_size + index) }
1184 Ok(self)
1185 }
1186
1187 /// Record a timestamp value for the given column.
1188 ///
1189 /// ```no_run
1190 /// # use questdb::Result;
1191 /// # use questdb::ingress::{Buffer, SenderBuilder};
1192 /// use questdb::ingress::TimestampMicros;
1193 /// # fn main() -> Result<()> {
1194 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1195 /// # let mut buffer = sender.new_buffer();
1196 /// # buffer.table("x")?;
1197 /// buffer.column_ts("col_name", TimestampMicros::now())?;
1198 /// # Ok(())
1199 /// # }
1200 /// ```
1201 ///
1202 /// or
1203 ///
1204 /// ```no_run
1205 /// # use questdb::Result;
1206 /// # use questdb::ingress::{Buffer, SenderBuilder};
1207 /// use questdb::ingress::TimestampMicros;
1208 ///
1209 /// # fn main() -> Result<()> {
1210 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1211 /// # let mut buffer = sender.new_buffer();
1212 /// # buffer.table("x")?;
1213 /// buffer.column_ts("col_name", TimestampMicros::new(1659548204354448))?;
1214 /// # Ok(())
1215 /// # }
1216 /// ```
1217 ///
1218 /// or
1219 ///
1220 /// ```no_run
1221 /// # use questdb::Result;
1222 /// # use questdb::ingress::{Buffer, SenderBuilder};
1223 /// use questdb::ingress::TimestampMicros;
1224 /// use questdb::ingress::ColumnName;
1225 ///
1226 /// # fn main() -> Result<()> {
1227 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1228 /// # let mut buffer = sender.new_buffer();
1229 /// # buffer.table("x")?;
1230 /// let col_name = ColumnName::new("col_name")?;
1231 /// buffer.column_ts(col_name, TimestampMicros::now())?;
1232 /// # Ok(())
1233 /// # }
1234 /// ```
1235 ///
1236 /// or you can also pass in a `TimestampNanos`.
1237 ///
1238 /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1239 /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1240 ///
1241 /// This last option requires the `chrono_timestamp` feature.
1242 pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> crate::Result<&mut Self>
1243 where
1244 N: TryInto<ColumnName<'a>>,
1245 T: TryInto<Timestamp>,
1246 Error: From<N::Error>,
1247 Error: From<T::Error>,
1248 {
1249 self.write_column_key(name)?;
1250 let timestamp: Timestamp = value.try_into()?;
1251 let (number, suffix) = match (self.protocol_version, timestamp) {
1252 (ProtocolVersion::V1, _) => {
1253 let timestamp: TimestampMicros = timestamp.try_into()?;
1254 (timestamp.as_i64(), b't')
1255 }
1256 (_, Timestamp::Micros(ts)) => (ts.as_i64(), b't'),
1257 (_, Timestamp::Nanos(ts)) => (ts.as_i64(), b'n'),
1258 };
1259
1260 let mut buf = itoa::Buffer::new();
1261 let printed = buf.format(number);
1262 self.output.extend_from_slice(printed.as_bytes());
1263 self.output.push(suffix);
1264 Ok(self)
1265 }
1266
1267 /// Complete the current row with the designated timestamp. After this call, you can
1268 /// start recording the next row by calling [Buffer::table] again, or you can send
1269 /// the accumulated batch by calling [Sender::flush] or one of its variants.
1270 ///
1271 /// ```no_run
1272 /// # use questdb::Result;
1273 /// # use questdb::ingress::{Buffer, SenderBuilder};
1274 /// use questdb::ingress::TimestampNanos;
1275 /// # fn main() -> Result<()> {
1276 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1277 /// # let mut buffer = sender.new_buffer();
1278 /// # buffer.table("x")?.symbol("a", "b")?;
1279 /// buffer.at(TimestampNanos::now())?;
1280 /// # Ok(())
1281 /// # }
1282 /// ```
1283 ///
1284 /// or
1285 ///
1286 /// ```no_run
1287 /// # use questdb::Result;
1288 /// # use questdb::ingress::{Buffer, SenderBuilder};
1289 /// use questdb::ingress::TimestampNanos;
1290 ///
1291 /// # fn main() -> Result<()> {
1292 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1293 /// # let mut buffer = sender.new_buffer();
1294 /// # buffer.table("x")?.symbol("a", "b")?;
1295 /// buffer.at(TimestampNanos::new(1659548315647406592))?;
1296 /// # Ok(())
1297 /// # }
1298 /// ```
1299 ///
1300 /// You can also pass in a `TimestampMicros`.
1301 ///
1302 /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1303 /// easily from either `std::time::SystemTime` or `chrono::DateTime`.
1304 ///
1305 pub fn at<T>(&mut self, timestamp: T) -> crate::Result<()>
1306 where
1307 T: TryInto<Timestamp>,
1308 Error: From<T::Error>,
1309 {
1310 self.check_op(Op::At)?;
1311 let timestamp: Timestamp = timestamp.try_into()?;
1312
1313 let (number, termination) = match (self.protocol_version, timestamp) {
1314 (ProtocolVersion::V1, _) => {
1315 let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
1316 (timestamp?.as_i64(), "\n")
1317 }
1318 (_, Timestamp::Micros(micros)) => (micros.as_i64(), "t\n"),
1319 (_, Timestamp::Nanos(nanos)) => (nanos.as_i64(), "n\n"),
1320 };
1321
1322 if number < 0 {
1323 return Err(error::fmt!(
1324 InvalidTimestamp,
1325 "Timestamp {} is negative. It must be >= 0.",
1326 number
1327 ));
1328 }
1329
1330 let mut buf = itoa::Buffer::new();
1331 let printed = buf.format(number);
1332 self.output.push(b' ');
1333 self.output.extend_from_slice(printed.as_bytes());
1334 self.output.extend_from_slice(termination.as_bytes());
1335 self.state.op_case = OpCase::MayFlushOrTable;
1336 self.state.row_count += 1;
1337 Ok(())
1338 }
1339
1340 /// Complete the current row without providing a timestamp. The QuestDB instance
1341 /// will insert its own timestamp.
1342 ///
1343 /// Letting the server assign the timestamp can be faster since it reliably avoids
1344 /// out-of-order operations in the database for maximum ingestion throughput. However,
1345 /// it removes the ability to deduplicate rows.
1346 ///
1347 /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB
1348 /// server will set the timestamp only after receiving the row. If you're flushing
1349 /// infrequently, the server-assigned timestamp may be significantly behind the
1350 /// time the data was recorded in the buffer.
1351 ///
1352 /// In almost all cases, you should prefer the [Buffer::at] function.
1353 ///
1354 /// After this call, you can start recording the next row by calling [Buffer::table]
1355 /// again, or you can send the accumulated batch by calling [Sender::flush] or one of
1356 /// its variants.
1357 ///
1358 /// ```no_run
1359 /// # use questdb::Result;
1360 /// # use questdb::ingress::{Buffer, SenderBuilder};
1361 /// # fn main() -> Result<()> {
1362 /// # let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1363 /// # let mut buffer = sender.new_buffer();
1364 /// # buffer.table("x")?.symbol("a", "b")?;
1365 /// buffer.at_now()?;
1366 /// # Ok(())
1367 /// # }
1368 /// ```
1369 pub fn at_now(&mut self) -> crate::Result<()> {
1370 self.check_op(Op::At)?;
1371 self.output.push(b'\n');
1372 self.state.op_case = OpCase::MayFlushOrTable;
1373 self.state.row_count += 1;
1374 Ok(())
1375 }
1376}
1377
1378impl Debug for Buffer {
1379 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1380 f.debug_struct("Buffer")
1381 .field("output", &DebugBytes(&self.output))
1382 .field("state", &self.state)
1383 .field("marker", &self.marker)
1384 .field("max_name_len", &self.max_name_len)
1385 .field("protocol_version", &self.protocol_version)
1386 .finish()
1387 }
1388}