use crate::storage::schema::Value;
use crate::storage::unified::{EntityData, EntityKind, UnifiedEntity};
pub(crate) fn timeseries_tags_json_value(
tags: &std::collections::HashMap<String, String>,
) -> Value {
let object = tags
.iter()
.map(|(key, value)| (key.clone(), crate::json::Value::String(value.clone())))
.collect();
let json = crate::json::Value::Object(object);
Value::Json(crate::json::to_vec(&json).unwrap_or_default())
}
#[inline(always)]
pub(crate) fn write_u64(buf: &mut Vec<u8>, n: u64) {
let mut b = itoa::Buffer::new();
let s = b.format(n);
buf.extend_from_slice(s.as_bytes());
}
#[inline(always)]
pub(crate) fn write_timestamp_fields_json(buf: &mut Vec<u8>, entity: &UnifiedEntity) {
let (row_created_at, row_updated_at) = match &entity.data {
EntityData::Row(row) => {
let mut created = None;
let mut updated = None;
for (key, value) in row.iter_fields() {
match key {
"created_at" => created = Some(value.clone()),
"updated_at" => updated = Some(value.clone()),
_ => {}
}
if created.is_some() && updated.is_some() {
break;
}
}
(created, updated)
}
_ => (None, None),
};
buf.extend_from_slice(b"\"created_at\":");
if let Some(v) = &row_created_at {
write_value_bytes(buf, v);
} else {
write_u64(buf, entity.created_at);
}
buf.extend_from_slice(b",\"updated_at\":");
if let Some(v) = &row_updated_at {
write_value_bytes(buf, v);
} else {
write_u64(buf, entity.updated_at);
}
}
#[inline(always)]
pub(crate) fn write_entity_json_bytes(buf: &mut Vec<u8>, entity: &UnifiedEntity) {
buf.push(b'{');
buf.extend_from_slice(b"\"red_entity_id\":");
write_u64(buf, entity.id.raw());
buf.extend_from_slice(b",\"red_collection\":");
write_json_bytes(buf, entity.kind.collection().as_bytes());
buf.extend_from_slice(b",\"red_kind\":");
write_json_bytes(buf, entity.kind.storage_type().as_bytes());
buf.push(b',');
write_timestamp_fields_json(buf, entity);
buf.extend_from_slice(b",\"red_sequence_id\":");
write_u64(buf, entity.sequence_id);
let (entity_type, capabilities) = match &entity.data {
EntityData::Row(_) => ("table", "structured,table"),
EntityData::TimeSeries(_) => ("timeseries", "timeseries,metric,temporal"),
_ => ("unknown", "unknown"),
};
buf.extend_from_slice(b",\"red_entity_type\":");
write_json_bytes(buf, entity_type.as_bytes());
buf.extend_from_slice(b",\"red_capabilities\":");
write_json_bytes(buf, capabilities.as_bytes());
if let EntityKind::TableRow { row_id, .. } = &entity.kind {
buf.extend_from_slice(b",\"row_id\":");
write_u64(buf, *row_id);
}
match &entity.data {
EntityData::Row(row) => {
for (key, value) in row.iter_fields() {
if key == "created_at" || key == "updated_at" {
continue;
}
buf.push(b',');
write_json_bytes(buf, key.as_bytes());
buf.push(b':');
write_value_bytes(buf, value);
}
}
EntityData::TimeSeries(ts) => {
buf.extend_from_slice(b",\"metric\":");
write_json_bytes(buf, ts.metric.as_bytes());
buf.extend_from_slice(b",\"timestamp_ns\":");
write_u64(buf, ts.timestamp_ns);
buf.extend_from_slice(b",\"timestamp\":");
write_u64(buf, ts.timestamp_ns);
buf.extend_from_slice(b",\"time\":");
write_u64(buf, ts.timestamp_ns);
buf.extend_from_slice(b",\"value\":");
write_value_bytes(buf, &Value::Float(ts.value));
if !ts.tags.is_empty() {
buf.extend_from_slice(b",\"tags\":");
write_value_bytes(buf, ×eries_tags_json_value(&ts.tags));
}
}
_ => {}
}
buf.push(b'}');
}
#[inline(always)]
pub(crate) fn write_json_bytes(buf: &mut Vec<u8>, s: &[u8]) {
buf.push(b'"');
for &b in s {
match b {
b'"' => buf.extend_from_slice(b"\\\""),
b'\\' => buf.extend_from_slice(b"\\\\"),
b'\n' => buf.extend_from_slice(b"\\n"),
b'\r' => buf.extend_from_slice(b"\\r"),
b'\t' => buf.extend_from_slice(b"\\t"),
b if b < 0x20 => {
buf.extend_from_slice(b"\\u00");
let hi = b >> 4;
let lo = b & 0xf;
buf.push(if hi < 10 { b'0' + hi } else { b'a' + hi - 10 });
buf.push(if lo < 10 { b'0' + lo } else { b'a' + lo - 10 });
}
_ => buf.push(b),
}
}
buf.push(b'"');
}
#[inline(always)]
pub(crate) fn write_value_bytes(buf: &mut Vec<u8>, value: &Value) {
match value {
Value::Null => buf.extend_from_slice(b"null"),
Value::Boolean(true) => buf.extend_from_slice(b"true"),
Value::Boolean(false) => buf.extend_from_slice(b"false"),
Value::Integer(n) => {
let mut b = itoa::Buffer::new();
let s = b.format(*n);
buf.extend_from_slice(s.as_bytes());
}
Value::UnsignedInteger(n) => {
let mut b = itoa::Buffer::new();
let s = b.format(*n);
buf.extend_from_slice(s.as_bytes());
}
Value::Float(f) => {
if f.is_finite() {
use std::io::Write;
let _ = write!(buf, "{}", f);
} else {
buf.extend_from_slice(b"null");
}
}
Value::Text(s) => write_json_bytes(buf, s.as_bytes()),
Value::Array(values) => {
buf.push(b'[');
for (index, value) in values.iter().enumerate() {
if index > 0 {
buf.push(b',');
}
write_value_bytes(buf, value);
}
buf.push(b']');
}
Value::Json(value) => {
if std::str::from_utf8(value).is_ok() {
buf.extend_from_slice(value);
} else {
buf.extend_from_slice(b"null");
}
}
Value::Timestamp(t) => {
let mut b = itoa::Buffer::new();
let s = b.format(*t);
buf.extend_from_slice(s.as_bytes());
}
_ => buf.extend_from_slice(b"null"),
}
}
pub(crate) fn execute_runtime_serialize_single_entity(entity: &UnifiedEntity) -> String {
let mut buf = Vec::with_capacity(512);
buf.extend_from_slice(
b"{\"columns\":[],\"record_count\":1,\"selection\":{\"scope\":\"any\"},\"records\":[",
);
write_entity_json_bytes(&mut buf, entity);
buf.extend_from_slice(b"]}");
unsafe { String::from_utf8_unchecked(buf) }
}