use crate::ingress::decimal::DecimalView;
use crate::ingress::ndarr::{ArrayElementSealed, check_and_get_array_bytes_size};
use crate::ingress::{
ARRAY_BINARY_FORMAT_TYPE, ArrayElement, DOUBLE_BINARY_FORMAT_TYPE, DebugBytes, MAX_ARRAY_DIMS,
MAX_NAME_LEN_DEFAULT, NdArrayView, ProtocolVersion, Timestamp, TimestampMicros, TimestampNanos,
ndarr,
};
use crate::{Error, error};
use std::fmt::{Debug, Formatter};
use std::num::NonZeroUsize;
use std::slice::from_raw_parts_mut;
fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut Vec<u8>, s: &str)
where
C: Fn(u8) -> bool,
Q: Fn(&mut Vec<u8>),
{
let mut to_escape = 0usize;
for b in s.bytes() {
if check_escape_fn(b) {
to_escape += 1;
}
}
quoting_fn(output);
if to_escape == 0 {
output.extend_from_slice(s.as_bytes());
} else {
let additional = s.len() + to_escape;
output.reserve(additional);
let mut index = output.len();
unsafe { output.set_len(index + additional) };
for b in s.bytes() {
if check_escape_fn(b) {
unsafe {
*output.get_unchecked_mut(index) = b'\\';
}
index += 1;
}
unsafe {
*output.get_unchecked_mut(index) = b;
}
index += 1;
}
}
quoting_fn(output);
}
fn must_escape_unquoted(c: u8) -> bool {
matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
}
fn must_escape_quoted(c: u8) -> bool {
matches!(c, b'\n' | b'\r' | b'"' | b'\\')
}
fn write_escaped_unquoted(output: &mut Vec<u8>, s: &str) {
write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
}
fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
}
pub(crate) struct F64Serializer {
buf: ryu::Buffer,
n: f64,
}
impl F64Serializer {
pub(crate) fn new(n: f64) -> Self {
F64Serializer {
buf: ryu::Buffer::new(),
n,
}
}
#[cold]
fn format_nonfinite(&self) -> &'static str {
const MANTISSA_MASK: u64 = 0x000fffffffffffff;
const SIGN_MASK: u64 = 0x8000000000000000;
let bits = self.n.to_bits();
if bits & MANTISSA_MASK != 0 {
"NaN"
} else if bits & SIGN_MASK != 0 {
"-Infinity"
} else {
"Infinity"
}
}
pub(crate) fn as_str(&mut self) -> &str {
if self.n.is_finite() {
self.buf.format_finite(self.n)
} else {
self.format_nonfinite()
}
}
}
#[derive(Debug, Copy, Clone)]
enum Op {
Table = 1,
Symbol = 1 << 1,
Column = 1 << 2,
At = 1 << 3,
Flush = 1 << 4,
}
impl Op {
fn descr(self) -> &'static str {
match self {
Op::Table => "table",
Op::Symbol => "symbol",
Op::Column => "column",
Op::At => "at",
Op::Flush => "flush",
}
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
enum OpCase {
Init = Op::Table as isize,
TableWritten = Op::Symbol as isize | Op::Column as isize,
SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
ColumnWritten = Op::Column as isize | Op::At as isize,
MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
}
impl OpCase {
fn next_op_descr(self) -> &'static str {
match self {
OpCase::Init => "should have called `table` instead",
OpCase::TableWritten => "should have called `symbol` or `column` instead",
OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
OpCase::ColumnWritten => "should have called `column` or `at` instead",
OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
}
}
}
#[derive(Debug, Clone, Copy)]
struct BufferState {
op_case: OpCase,
row_count: usize,
first_table_len: Option<NonZeroUsize>,
transactional: bool,
}
impl BufferState {
fn new() -> Self {
Self {
op_case: OpCase::Init,
row_count: 0,
first_table_len: None,
transactional: true,
}
}
}
#[derive(Clone, Copy)]
pub struct TableName<'a> {
name: &'a str,
}
impl<'a> TableName<'a> {
pub fn new(name: &'a str) -> crate::Result<Self> {
if name.is_empty() {
return Err(error::fmt!(
InvalidName,
"Table names must have a non-zero length."
));
}
let mut prev = '\0';
for (index, c) in name.chars().enumerate() {
match c {
'.' => {
if index == 0 || index == name.len() - 1 || prev == '.' {
return Err(error::fmt!(
InvalidName,
concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
name,
index
));
}
}
'?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
| '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
| '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
| '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
return Err(error::fmt!(
InvalidName,
concat!(
"Bad string {:?}: ",
"Table names can't contain ",
"a {:?} character, which was found at ",
"byte position {}."
),
name,
c,
index
));
}
'\u{feff}' => {
return Err(error::fmt!(
InvalidName,
concat!(
"Bad string {:?}: ",
"Table names can't contain ",
"a UTF-8 BOM character, which was found at ",
"byte position {}."
),
name,
index
));
}
_ => (),
}
prev = c;
}
Ok(Self { name })
}
pub fn new_unchecked(name: &'a str) -> Self {
Self { name }
}
}
impl<'a> TryFrom<&'a str> for TableName<'a> {
type Error = Error;
fn try_from(name: &'a str) -> crate::Result<Self> {
Self::new(name)
}
}
impl AsRef<str> for TableName<'_> {
fn as_ref(&self) -> &str {
self.name
}
}
#[derive(Clone, Copy)]
pub struct ColumnName<'a> {
name: &'a str,
}
impl<'a> ColumnName<'a> {
pub fn new(name: &'a str) -> crate::Result<Self> {
if name.is_empty() {
return Err(error::fmt!(
InvalidName,
"Column names must have a non-zero length."
));
}
for (index, c) in name.chars().enumerate() {
match c {
'?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
| '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
| '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
| '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
return Err(error::fmt!(
InvalidName,
concat!(
"Bad string {:?}: ",
"Column names can't contain ",
"a {:?} character, which was found at ",
"byte position {}."
),
name,
c,
index
));
}
'\u{FEFF}' => {
return Err(error::fmt!(
InvalidName,
concat!(
"Bad string {:?}: ",
"Column names can't contain ",
"a UTF-8 BOM character, which was found at ",
"byte position {}."
),
name,
index
));
}
_ => (),
}
}
Ok(Self { name })
}
pub fn new_unchecked(name: &'a str) -> Self {
Self { name }
}
}
impl<'a> TryFrom<&'a str> for ColumnName<'a> {
type Error = Error;
fn try_from(name: &'a str) -> crate::Result<Self> {
Self::new(name)
}
}
impl AsRef<str> for ColumnName<'_> {
fn as_ref(&self) -> &str {
self.name
}
}
#[derive(Clone)]
pub struct Buffer {
output: Vec<u8>,
state: BufferState,
marker: Option<(usize, BufferState)>,
max_name_len: usize,
protocol_version: ProtocolVersion,
}
impl Buffer {
pub fn new(protocol_version: ProtocolVersion) -> Self {
Self::with_max_name_len(protocol_version, MAX_NAME_LEN_DEFAULT)
}
pub fn with_max_name_len(protocol_version: ProtocolVersion, max_name_len: usize) -> Self {
Self {
output: Vec::new(),
state: BufferState::new(),
marker: None,
max_name_len,
protocol_version,
}
}
pub fn protocol_version(&self) -> ProtocolVersion {
self.protocol_version
}
pub fn reserve(&mut self, additional: usize) {
self.output.reserve(additional);
}
pub fn len(&self) -> usize {
self.output.len()
}
pub fn row_count(&self) -> usize {
self.state.row_count
}
pub fn transactional(&self) -> bool {
self.state.transactional
}
pub fn is_empty(&self) -> bool {
self.output.is_empty()
}
pub fn capacity(&self) -> usize {
self.output.capacity()
}
pub fn as_bytes(&self) -> &[u8] {
&self.output
}
pub fn set_marker(&mut self) -> crate::Result<()> {
if (self.state.op_case as isize & Op::Table as isize) == 0 {
return Err(error::fmt!(
InvalidApiCall,
concat!(
"Can't set the marker whilst constructing a line. ",
"A marker may only be set on an empty buffer or after ",
"`at` or `at_now` is called."
)
));
}
self.marker = Some((self.output.len(), self.state));
Ok(())
}
pub fn rewind_to_marker(&mut self) -> crate::Result<()> {
if let Some((position, state)) = self.marker.take() {
self.output.truncate(position);
self.state = state;
Ok(())
} else {
Err(error::fmt!(
InvalidApiCall,
"Can't rewind to the marker: No marker set."
))
}
}
pub fn clear_marker(&mut self) {
self.marker = None;
}
pub fn clear(&mut self) {
self.output.clear();
self.state = BufferState::new();
self.marker = None;
}
#[inline(always)]
fn check_op(&self, op: Op) -> crate::Result<()> {
if (self.state.op_case as isize & op as isize) > 0 {
Ok(())
} else {
Err(error::fmt!(
InvalidApiCall,
"State error: Bad call to `{}`, {}.",
op.descr(),
self.state.op_case.next_op_descr()
))
}
}
#[inline(always)]
pub fn check_can_flush(&self) -> crate::Result<()> {
self.check_op(Op::Flush)
}
#[inline(always)]
fn validate_max_name_len(&self, name: &str) -> crate::Result<()> {
if name.len() > self.max_name_len {
return Err(error::fmt!(
InvalidName,
"Bad name: {:?}: Too long (max {} characters)",
name,
self.max_name_len
));
}
Ok(())
}
pub fn table<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
where
N: TryInto<TableName<'a>>,
Error: From<N::Error>,
{
let name: TableName<'a> = name.try_into()?;
self.validate_max_name_len(name.name)?;
self.check_op(Op::Table)?;
let table_begin = self.output.len();
write_escaped_unquoted(&mut self.output, name.name);
let table_end = self.output.len();
self.state.op_case = OpCase::TableWritten;
if let Some(first_table_len) = &self.state.first_table_len {
let first_table = &self.output[0..first_table_len.get()];
let this_table = &self.output[table_begin..table_end];
if first_table != this_table {
self.state.transactional = false;
}
} else {
debug_assert!(table_begin == 0);
let first_table_len = NonZeroUsize::new(table_end);
debug_assert!(first_table_len.is_some());
self.state.first_table_len = first_table_len;
}
Ok(self)
}
pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
S: AsRef<str>,
Error: From<N::Error>,
{
let name: ColumnName<'a> = name.try_into()?;
self.validate_max_name_len(name.name)?;
self.check_op(Op::Symbol)?;
self.output.push(b',');
write_escaped_unquoted(&mut self.output, name.name);
self.output.push(b'=');
write_escaped_unquoted(&mut self.output, value.as_ref());
self.state.op_case = OpCase::SymbolWritten;
Ok(self)
}
fn write_column_key<'a, N>(&mut self, name: N) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
Error: From<N::Error>,
{
let name: ColumnName<'a> = name.try_into()?;
self.validate_max_name_len(name.name)?;
self.check_op(Op::Column)?;
self.output
.push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
b' '
} else {
b','
});
write_escaped_unquoted(&mut self.output, name.name);
self.output.push(b'=');
self.state.op_case = OpCase::ColumnWritten;
Ok(self)
}
pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
Error: From<N::Error>,
{
self.write_column_key(name)?;
self.output.push(if value { b't' } else { b'f' });
Ok(self)
}
pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
Error: From<N::Error>,
{
self.write_column_key(name)?;
let mut buf = itoa::Buffer::new();
let printed = buf.format(value);
self.output.extend_from_slice(printed.as_bytes());
self.output.push(b'i');
Ok(self)
}
pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
Error: From<N::Error>,
{
self.write_column_key(name)?;
if !matches!(self.protocol_version, ProtocolVersion::V1) {
self.output.push(b'=');
self.output.push(DOUBLE_BINARY_FORMAT_TYPE);
self.output.extend_from_slice(&value.to_le_bytes())
} else {
let mut ser = F64Serializer::new(value);
self.output.extend_from_slice(ser.as_str().as_bytes())
}
Ok(self)
}
pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
S: AsRef<str>,
Error: From<N::Error>,
{
self.write_column_key(name)?;
write_escaped_quoted(&mut self.output, value.as_ref());
Ok(self)
}
pub fn column_dec<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
S: TryInto<DecimalView<'a>>,
Error: From<N::Error>,
Error: From<S::Error>,
{
if self.protocol_version < ProtocolVersion::V3 {
return Err(error::fmt!(
ProtocolVersionError,
"Protocol version {} does not support the decimal datatype",
self.protocol_version
));
}
let value: DecimalView = value.try_into()?;
self.write_column_key(name)?;
value.serialize(&mut self.output);
Ok(self)
}
#[allow(private_bounds)]
pub fn column_arr<'a, N, T, D>(&mut self, name: N, view: &T) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
T: NdArrayView<D>,
D: ArrayElement + ArrayElementSealed,
Error: From<N::Error>,
{
if self.protocol_version < ProtocolVersion::V2 {
return Err(error::fmt!(
ProtocolVersionError,
"Protocol version {} does not support array datatype",
self.protocol_version
));
}
let ndim = view.ndim();
if ndim == 0 {
return Err(error::fmt!(
ArrayError,
"Zero-dimensional arrays are not supported",
));
}
if MAX_ARRAY_DIMS < ndim {
return Err(error::fmt!(
ArrayError,
"Array dimension mismatch: expected at most {} dimensions, but got {}",
MAX_ARRAY_DIMS,
ndim
));
}
let array_buf_size = check_and_get_array_bytes_size(view)?;
self.write_column_key(name)?;
self.output.push(b'=');
self.output.push(ARRAY_BINARY_FORMAT_TYPE);
self.output.push(D::type_tag());
self.output.push(ndim as u8);
let dim_header_size = size_of::<u32>() * ndim;
self.output.reserve(dim_header_size + array_buf_size);
for i in 0..ndim {
self.output
.extend_from_slice((view.dim(i)? as u32).to_le_bytes().as_slice());
}
let index = self.output.len();
let writeable =
unsafe { from_raw_parts_mut(self.output.as_mut_ptr().add(index), array_buf_size) };
ndarr::write_array_data(view, writeable, array_buf_size)?;
unsafe { self.output.set_len(array_buf_size + index) }
Ok(self)
}
pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> crate::Result<&mut Self>
where
N: TryInto<ColumnName<'a>>,
T: TryInto<Timestamp>,
Error: From<N::Error>,
Error: From<T::Error>,
{
self.write_column_key(name)?;
let timestamp: Timestamp = value.try_into()?;
let (number, suffix) = match (self.protocol_version, timestamp) {
(ProtocolVersion::V1, _) => {
let timestamp: TimestampMicros = timestamp.try_into()?;
(timestamp.as_i64(), b't')
}
(_, Timestamp::Micros(ts)) => (ts.as_i64(), b't'),
(_, Timestamp::Nanos(ts)) => (ts.as_i64(), b'n'),
};
let mut buf = itoa::Buffer::new();
let printed = buf.format(number);
self.output.extend_from_slice(printed.as_bytes());
self.output.push(suffix);
Ok(self)
}
pub fn at<T>(&mut self, timestamp: T) -> crate::Result<()>
where
T: TryInto<Timestamp>,
Error: From<T::Error>,
{
self.check_op(Op::At)?;
let timestamp: Timestamp = timestamp.try_into()?;
let (number, termination) = match (self.protocol_version, timestamp) {
(ProtocolVersion::V1, _) => {
let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
(timestamp?.as_i64(), "\n")
}
(_, Timestamp::Micros(micros)) => (micros.as_i64(), "t\n"),
(_, Timestamp::Nanos(nanos)) => (nanos.as_i64(), "n\n"),
};
if number < 0 {
return Err(error::fmt!(
InvalidTimestamp,
"Timestamp {} is negative. It must be >= 0.",
number
));
}
let mut buf = itoa::Buffer::new();
let printed = buf.format(number);
self.output.push(b' ');
self.output.extend_from_slice(printed.as_bytes());
self.output.extend_from_slice(termination.as_bytes());
self.state.op_case = OpCase::MayFlushOrTable;
self.state.row_count += 1;
Ok(())
}
pub fn at_now(&mut self) -> crate::Result<()> {
self.check_op(Op::At)?;
self.output.push(b'\n');
self.state.op_case = OpCase::MayFlushOrTable;
self.state.row_count += 1;
Ok(())
}
}
impl Debug for Buffer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Buffer")
.field("output", &DebugBytes(&self.output))
.field("state", &self.state)
.field("marker", &self.marker)
.field("max_name_len", &self.max_name_len)
.field("protocol_version", &self.protocol_version)
.finish()
}
}