use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fmt;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Value {
Null,
Boolean(bool),
Integer(i64),
Double(f64),
String(String),
Timestamp(DateTime<Utc>),
Array(Vec<Value>),
Map(BTreeMap<String, Value>),
}
impl Value {
pub fn is_null(&self) -> bool {
matches!(self, Value::Null)
}
pub fn is_truthy(&self) -> bool {
match self {
Value::Null => false,
Value::Boolean(b) => *b,
Value::Integer(i) => *i != 0,
Value::Double(f) => *f != 0.0,
Value::String(s) => !s.is_empty(),
_ => true,
}
}
pub fn as_i64(&self) -> Option<i64> {
match self {
Value::Integer(i) => Some(*i),
Value::Double(f) => Some(*f as i64),
_ => None,
}
}
pub fn as_f64(&self) -> Option<f64> {
match self {
Value::Double(f) => Some(*f),
Value::Integer(i) => Some(*i as f64),
_ => None,
}
}
pub fn as_str(&self) -> Option<&str> {
match self {
Value::String(s) => Some(s),
_ => None,
}
}
pub fn as_bool(&self) -> Option<bool> {
match self {
Value::Boolean(b) => Some(*b),
_ => None,
}
}
pub fn type_name(&self) -> &'static str {
match self {
Value::Null => "NULL",
Value::Boolean(_) => "BOOLEAN",
Value::Integer(_) => "INTEGER",
Value::Double(_) => "DOUBLE",
Value::String(_) => "STRING",
Value::Timestamp(_) => "TIMESTAMP",
Value::Array(_) => "ARRAY",
Value::Map(_) => "MAP",
}
}
}
impl Eq for Value {}
impl PartialOrd for Value {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Value {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (self, other) {
(Value::Null, Value::Null) => std::cmp::Ordering::Equal,
(Value::Null, _) => std::cmp::Ordering::Less,
(_, Value::Null) => std::cmp::Ordering::Greater,
(Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
(Value::Integer(a), Value::Integer(b)) => a.cmp(b),
(Value::Integer(a), Value::Double(b)) => (*a as f64)
.partial_cmp(b)
.unwrap_or(std::cmp::Ordering::Equal),
(Value::Double(a), Value::Integer(b)) => a
.partial_cmp(&(*b as f64))
.unwrap_or(std::cmp::Ordering::Equal),
(Value::Double(a), Value::Double(b)) => {
a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
}
(Value::String(a), Value::String(b)) => a.cmp(b),
(Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
(Value::Array(a), Value::Array(b)) => a.cmp(b),
_ => self.type_name().cmp(other.type_name()),
}
}
}
impl std::hash::Hash for Value {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
std::mem::discriminant(self).hash(state);
match self {
Value::Null => {}
Value::Boolean(b) => b.hash(state),
Value::Integer(i) => i.hash(state),
Value::Double(f) => f.to_bits().hash(state),
Value::String(s) => s.hash(state),
Value::Timestamp(t) => t.hash(state),
Value::Array(a) => a.hash(state),
Value::Map(m) => {
for (k, v) in m {
k.hash(state);
v.hash(state);
}
}
}
}
}
impl fmt::Display for Value {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Value::Null => write!(f, "NULL"),
Value::Boolean(b) => write!(f, "{b}"),
Value::Integer(i) => write!(f, "{i}"),
Value::Double(d) => write!(f, "{d}"),
Value::String(s) => write!(f, "{s}"),
Value::Timestamp(t) => write!(f, "{t}"),
Value::Array(a) => write!(f, "{a:?}"),
Value::Map(m) => write!(f, "{m:?}"),
}
}
}
impl From<serde_json::Value> for Value {
fn from(v: serde_json::Value) -> Self {
match v {
serde_json::Value::Null => Value::Null,
serde_json::Value::Bool(b) => Value::Boolean(b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::Integer(i)
} else if let Some(f) = n.as_f64() {
Value::Double(f)
} else {
Value::Null
}
}
serde_json::Value::String(s) => Value::String(s),
serde_json::Value::Array(a) => Value::Array(a.into_iter().map(Value::from).collect()),
serde_json::Value::Object(m) => {
Value::Map(m.into_iter().map(|(k, v)| (k, Value::from(v))).collect())
}
}
}
}
impl From<&Value> for serde_json::Value {
fn from(v: &Value) -> Self {
match v {
Value::Null => serde_json::Value::Null,
Value::Boolean(b) => serde_json::Value::Bool(*b),
Value::Integer(i) => serde_json::Value::Number((*i).into()),
Value::Double(f) => serde_json::Number::from_f64(*f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
Value::String(s) => serde_json::Value::String(s.clone()),
Value::Timestamp(t) => serde_json::Value::String(t.to_rfc3339()),
Value::Array(a) => {
serde_json::Value::Array(a.iter().map(serde_json::Value::from).collect())
}
Value::Map(m) => {
let obj: serde_json::Map<String, serde_json::Value> = m
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::from(v)))
.collect();
serde_json::Value::Object(obj)
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum DataType {
Boolean,
Integer,
BigInt,
Double,
String,
Timestamp,
Array(Box<DataType>),
Map(Box<DataType>, Box<DataType>),
Struct(Vec<Column>),
}
impl fmt::Display for DataType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DataType::Boolean => write!(f, "BOOLEAN"),
DataType::Integer => write!(f, "INTEGER"),
DataType::BigInt => write!(f, "BIGINT"),
DataType::Double => write!(f, "DOUBLE"),
DataType::String => write!(f, "STRING"),
DataType::Timestamp => write!(f, "TIMESTAMP"),
DataType::Array(inner) => write!(f, "ARRAY<{inner}>"),
DataType::Map(k, v) => write!(f, "MAP<{k}, {v}>"),
DataType::Struct(cols) => {
write!(f, "STRUCT<")?;
for (i, col) in cols.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{} {}", col.name, col.data_type)?;
}
write!(f, ">")
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Column {
pub name: String,
pub data_type: DataType,
}
impl Column {
pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
Self {
name: name.into(),
data_type,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Schema {
pub columns: Vec<Column>,
}
impl Schema {
pub fn new(columns: Vec<Column>) -> Self {
Self { columns }
}
pub fn empty() -> Self {
Self { columns: vec![] }
}
pub fn index_of(&self, name: &str) -> Option<usize> {
self.columns
.iter()
.position(|c| c.name.eq_ignore_ascii_case(name))
}
pub fn column(&self, name: &str) -> Option<&Column> {
self.columns
.iter()
.find(|c| c.name.eq_ignore_ascii_case(name))
}
pub fn data_type(&self, name: &str) -> Option<&DataType> {
self.column(name).map(|c| &c.data_type)
}
pub fn len(&self) -> usize {
self.columns.len()
}
pub fn is_empty(&self) -> bool {
self.columns.is_empty()
}
}
#[derive(Debug, Clone, Default)]
pub struct RowMetadata {
pub topic: Option<String>,
pub partition: Option<u32>,
pub offset: Option<u64>,
pub timestamp: Option<DateTime<Utc>>,
pub key: Option<String>,
pub window_start: Option<DateTime<Utc>>,
pub window_end: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone)]
pub struct Row {
pub values: Vec<Value>,
pub metadata: RowMetadata,
}
impl Row {
pub fn new(values: Vec<Value>) -> Self {
Self {
values,
metadata: RowMetadata::default(),
}
}
pub fn with_metadata(values: Vec<Value>, metadata: RowMetadata) -> Self {
Self { values, metadata }
}
pub fn get(&self, index: usize) -> &Value {
self.values.get(index).unwrap_or(&Value::Null)
}
}