use crate::error::{FlowError, Result};
use crate::record::{ScanRange, increment_prefix_bytes};
use serde_json::Value;
use std::ops::Bound;
pub(crate) const DOC_PREFIX: u8 = 0x01;
pub(crate) const IDX_PREFIX: u8 = 0x02;
pub(crate) const SCH_PREFIX: u8 = 0x03;
pub(crate) const CTR_PREFIX: u8 = 0x04;
pub(crate) const SEP: u8 = 0x00;
pub(crate) fn doc_key(store: &str, key: &[u8]) -> Vec<u8> {
let mut buf = Vec::with_capacity(2 + store.len() + key.len());
buf.push(DOC_PREFIX);
buf.extend_from_slice(store.as_bytes());
buf.push(SEP);
buf.extend_from_slice(key);
buf
}
pub(crate) fn doc_prefix(store: &str) -> Vec<u8> {
let mut buf = Vec::with_capacity(2 + store.len());
buf.push(DOC_PREFIX);
buf.extend_from_slice(store.as_bytes());
buf.push(SEP);
buf
}
pub(crate) fn idx_key(store: &str, index: &str, value: &[u8], key: &[u8]) -> Vec<u8> {
let mut buf = Vec::with_capacity(3 + store.len() + index.len() + value.len() + key.len());
buf.push(IDX_PREFIX);
buf.extend_from_slice(store.as_bytes());
buf.push(SEP);
buf.extend_from_slice(index.as_bytes());
buf.push(SEP);
buf.extend_from_slice(value);
buf.push(SEP);
buf.extend_from_slice(key);
buf
}
pub(crate) fn idx_prefix(store: &str, index: &str) -> Vec<u8> {
let mut buf = Vec::with_capacity(3 + store.len() + index.len());
buf.push(IDX_PREFIX);
buf.extend_from_slice(store.as_bytes());
buf.push(SEP);
buf.extend_from_slice(index.as_bytes());
buf.push(SEP);
buf
}
pub(crate) fn idx_value_prefix(store: &str, index: &str, value: &[u8]) -> Vec<u8> {
let mut buf = Vec::with_capacity(4 + store.len() + index.len() + value.len());
buf.push(IDX_PREFIX);
buf.extend_from_slice(store.as_bytes());
buf.push(SEP);
buf.extend_from_slice(index.as_bytes());
buf.push(SEP);
buf.extend_from_slice(value);
buf.push(SEP);
buf
}
pub(crate) fn schema_key(store: &str) -> Vec<u8> {
let mut buf = Vec::with_capacity(1 + store.len());
buf.push(SCH_PREFIX);
buf.extend_from_slice(store.as_bytes());
buf
}
pub(crate) fn schema_prefix() -> [u8; 1] {
[SCH_PREFIX]
}
pub(crate) fn counter_key(store: &str) -> Vec<u8> {
let mut buf = Vec::with_capacity(1 + store.len());
buf.push(CTR_PREFIX);
buf.extend_from_slice(store.as_bytes());
buf
}
pub(crate) fn prefix_range(prefix: &[u8]) -> ScanRange {
let end = increment_prefix_bytes(prefix);
ScanRange {
key_start: Bound::Included(prefix.to_vec()),
key_end: Bound::Excluded(end),
ts_start: Bound::Unbounded,
ts_end: Bound::Unbounded,
}
}
const TAG_NULL: u8 = 0x01;
const TAG_FALSE: u8 = 0x02;
const TAG_TRUE: u8 = 0x03;
const TAG_NUM_I64: u8 = 0x04;
const TAG_NUM_F64: u8 = 0x05;
const TAG_NUM_U64: u8 = 0x06;
const TAG_STRING: u8 = 0x07;
const TAG_OTHER: u8 = 0x08;
pub(crate) fn encode_index_value(value: &Value) -> Vec<u8> {
match value {
Value::Null => vec![TAG_NULL],
Value::Bool(false) => vec![TAG_FALSE],
Value::Bool(true) => vec![TAG_TRUE],
Value::Number(n) => {
if let Some(i) = n.as_i64() {
let mut buf = Vec::with_capacity(9);
buf.push(TAG_NUM_I64);
buf.extend_from_slice(&(i as u64 ^ (1u64 << 63)).to_be_bytes());
buf
} else if let Some(u) = n.as_u64() {
let mut buf = Vec::with_capacity(9);
buf.push(TAG_NUM_U64);
buf.extend_from_slice(&(u ^ (1u64 << 63)).to_be_bytes());
buf
} else if let Some(f) = n.as_f64() {
let bits = f.to_bits();
let adjusted = if bits & (1u64 << 63) != 0 {
!bits
} else {
bits ^ (1u64 << 63)
};
let mut buf = Vec::with_capacity(9);
buf.push(TAG_NUM_F64);
buf.extend_from_slice(&adjusted.to_be_bytes());
buf
} else {
vec![TAG_OTHER]
}
}
Value::String(s) => {
let bytes = s.as_bytes();
let mut buf = Vec::with_capacity(1 + bytes.len());
buf.push(TAG_STRING);
buf.extend_from_slice(bytes);
buf
}
_ => {
let mut buf = Vec::with_capacity(9);
buf.push(TAG_OTHER);
if let Ok(json) = serde_json::to_vec(value) {
buf.extend_from_slice(&json);
}
buf
}
}
}
pub(crate) fn encode_primary_key(value: &Value) -> Result<Vec<u8>> {
match value {
Value::String(s) => {
if s.contains('\x00') {
return Err(FlowError::JsonDb(
"primary key must not contain null byte (\\x00)".into(),
));
}
Ok(s.clone().into_bytes())
}
Value::Number(n) => Ok(n.to_string().into_bytes()),
Value::Bool(b) => Ok(b.to_string().into_bytes()),
Value::Null => Ok(vec![0x00]),
_ => serde_json::to_vec(value)
.map_err(FlowError::from)
.map(|mut v| {
v.insert(0, 0xFF);
v
}),
}
}
pub(crate) fn encode_doc(doc: &Value) -> Result<Vec<u8>> {
serde_json::to_vec(doc).map_err(FlowError::from)
}
pub(crate) fn decode_doc(bytes: &[u8]) -> Result<Value> {
serde_json::from_slice(bytes).map_err(FlowError::from)
}
pub(crate) fn extract_field(doc: &Value, path: &str) -> Option<Value> {
if path.is_empty() {
return Some(doc.clone());
}
let mut current = doc;
for segment in path.split('.') {
match current {
Value::Object(map) => {
current = map.get(segment)?;
}
Value::Array(arr) => {
let idx: usize = segment.parse().ok()?;
current = arr.get(idx)?;
}
_ => return None,
}
}
Some(current.clone())
}
pub(crate) fn validate_name(name: &str) -> Result<()> {
if name.is_empty() {
return Err(FlowError::JsonDb("name cannot be empty".into()));
}
if name.len() > 255 {
return Err(FlowError::JsonDb("name too long (max 255 bytes)".into()));
}
if !name
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-')
{
return Err(FlowError::JsonDb(format!(
"invalid name '{}': only alphanumeric, underscore and dash are allowed",
name
)));
}
Ok(())
}
pub(crate) fn encode_composite_value(values: &[Value]) -> Vec<u8> {
let mut buf = Vec::with_capacity(values.len() * 16);
for (i, v) in values.iter().enumerate() {
if i > 0 {
buf.push(SEP);
}
buf.extend_from_slice(&encode_index_value(v));
}
buf
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_doc_key() {
let key = doc_key("users", b"u1");
assert_eq!(&key[..1], &[DOC_PREFIX]);
assert_eq!(&key[1..6], b"users");
assert_eq!(key[6], SEP);
assert_eq!(&key[7..], b"u1");
}
#[test]
fn test_idx_key() {
let key = idx_key("users", "by_email", b"a@b.com", b"u1");
assert_eq!(&key[..1], &[IDX_PREFIX]);
}
#[test]
fn test_encode_index_value_ordering() {
let values = vec![
Value::Null,
Value::Bool(false),
Value::Bool(true),
json!(-100),
json!(-1),
json!(0),
json!(1),
json!(42),
json!(100),
json!("a"),
json!("b"),
json!("hello"),
];
let encoded: Vec<Vec<u8>> = values.iter().map(encode_index_value).collect();
for i in 0..encoded.len() - 1 {
assert!(
encoded[i] <= encoded[i + 1],
"ordering violation at index {}: {:?} > {:?}",
i,
encoded[i],
encoded[i + 1]
);
}
}
#[test]
#[allow(clippy::approx_constant)]
fn test_encode_index_value_float_ordering() {
let values = [
json!(-3.14),
json!(-1.0),
json!(0.0),
json!(0.5),
json!(1.0),
json!(3.14),
];
let encoded: Vec<Vec<u8>> = values.iter().map(encode_index_value).collect();
for i in 0..encoded.len() - 1 {
assert!(
encoded[i] < encoded[i + 1],
"float ordering violation at index {}",
i
);
}
}
#[test]
fn test_encode_primary_key() {
assert_eq!(encode_primary_key(&json!("hello")).unwrap(), b"hello");
assert_eq!(encode_primary_key(&json!(42)).unwrap(), b"42");
assert_eq!(encode_primary_key(&json!(true)).unwrap(), b"true");
}
#[test]
fn test_extract_field() {
let doc = json!({"user": {"name": "Alice", "tags": ["a", "b"]}});
assert_eq!(extract_field(&doc, "user.name"), Some(json!("Alice")));
assert_eq!(extract_field(&doc, "user.tags.0"), Some(json!("a")));
assert_eq!(extract_field(&doc, "user.tags.1"), Some(json!("b")));
assert_eq!(extract_field(&doc, "missing"), None);
assert_eq!(extract_field(&doc, "user.missing"), None);
assert_eq!(extract_field(&doc, ""), Some(doc.clone()));
}
#[test]
fn test_validate_name() {
assert!(validate_name("users").is_ok());
assert!(validate_name("by_email").is_ok());
assert!(validate_name("store-1").is_ok());
assert!(validate_name("").is_err());
assert!(validate_name("has space").is_err());
assert!(validate_name("has\x00null").is_err());
}
#[test]
fn test_encode_primary_key_rejects_null_bytes() {
let err = encode_primary_key(&json!("a\x00b")).unwrap_err();
let msg = format!("{}", err);
assert!(
msg.contains("null byte") || msg.contains("\\x00"),
"expected null byte error, got: {}",
msg
);
}
#[test]
fn test_encode_decode_doc() {
let doc = json!({"id": 1, "name": "Alice"});
let bytes = encode_doc(&doc).unwrap();
let back = decode_doc(&bytes).unwrap();
assert_eq!(doc, back);
}
}