use crate::{schema::CqlType, RowKey, Value};
use base64::Engine;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::fmt;
use tokio::sync::mpsc;
fn b64(bytes: &[u8]) -> String {
base64::engine::general_purpose::STANDARD.encode(bytes)
}
fn row_metadata_is_populated(meta: &RowMetadata) -> bool {
meta.version.is_some() || meta.ttl.is_some() || !meta.tags.is_empty()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryResult {
pub rows: Vec<QueryRow>,
pub rows_affected: u64,
pub execution_time_ms: u64,
pub metadata: QueryMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryRow {
pub values: HashMap<String, Value>,
pub key: RowKey,
pub metadata: RowMetadata,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct QueryMetadata {
pub columns: Vec<ColumnInfo>,
pub total_rows: Option<u64>,
pub plan_info: Option<PlanInfo>,
pub performance: PerformanceMetrics,
pub warnings: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnInfo {
pub name: String,
pub data_type: crate::types::DataType,
pub nullable: bool,
pub position: usize,
pub table_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cql_type: Option<CqlType>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RowMetadata {
pub version: Option<u64>,
pub ttl: Option<u64>,
pub tags: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanInfo {
pub plan_type: String,
pub estimated_cost: f64,
pub actual_cost: f64,
pub indexes_used: Vec<String>,
pub steps: Vec<String>,
pub parallelization: Option<ParallelizationInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParallelizationInfo {
pub threads_used: usize,
pub effective: bool,
pub partitions: Vec<PartitionInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionInfo {
pub id: usize,
pub rows_processed: u64,
pub processing_time_ms: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PerformanceMetrics {
pub parse_time_us: u64,
pub planning_time_us: u64,
pub execution_time_us: u64,
pub total_time_us: u64,
pub memory_usage_bytes: u64,
pub io_operations: u64,
pub cache_hits: u64,
pub cache_misses: u64,
}
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub buffer_size: usize,
pub chunk_size: usize,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
buffer_size: 1024, chunk_size: 10_000, }
}
}
impl StreamingConfig {
pub fn new(buffer_size: usize, chunk_size: usize) -> Self {
Self {
buffer_size,
chunk_size,
}
}
pub fn for_parquet() -> Self {
Self {
buffer_size: 1024,
chunk_size: 10_000, }
}
pub fn for_text_formats() -> Self {
Self {
buffer_size: 512,
chunk_size: 5_000, }
}
}
pub struct QueryResultIterator {
receiver: mpsc::Receiver<Result<QueryRow, crate::Error>>,
pub metadata: QueryMetadata,
pub total_rows_hint: Option<u64>,
rows_received: u64,
}
impl QueryResultIterator {
pub fn new(
receiver: mpsc::Receiver<Result<QueryRow, crate::Error>>,
metadata: QueryMetadata,
) -> Self {
Self {
receiver,
metadata,
total_rows_hint: None,
rows_received: 0,
}
}
pub fn with_total_hint(mut self, total: u64) -> Self {
self.total_rows_hint = Some(total);
self
}
pub async fn next_async(&mut self) -> Option<Result<QueryRow, crate::Error>> {
let result = self.receiver.recv().await?;
if result.is_ok() {
self.rows_received += 1;
}
Some(result)
}
const MAX_CHUNK_SIZE: usize = 100_000;
pub async fn collect_chunk(&mut self, size: usize) -> Result<Vec<QueryRow>, crate::Error> {
let safe_size = size.min(Self::MAX_CHUNK_SIZE);
let mut chunk = Vec::new();
while chunk.len() < safe_size {
match self.receiver.recv().await {
Some(Ok(row)) => {
self.rows_received += 1;
chunk.push(row);
}
Some(Err(e)) => return Err(e),
None => break,
}
}
Ok(chunk)
}
pub fn rows_received(&self) -> u64 {
self.rows_received
}
pub fn progress_percent(&self) -> Option<f64> {
self.total_rows_hint.map(|total| {
if total == 0 {
100.0
} else {
(self.rows_received as f64 / total as f64) * 100.0
}
})
}
}
impl QueryResult {
pub fn new() -> Self {
Self {
rows: Vec::new(),
rows_affected: 0,
execution_time_ms: 0,
metadata: QueryMetadata::default(),
}
}
pub fn with_rows(rows: Vec<QueryRow>) -> Self {
Self {
rows,
..Self::new()
}
}
pub fn with_affected_rows(rows_affected: u64) -> Self {
Self {
rows_affected,
..Self::new()
}
}
pub fn row_count(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn get_row(&self, index: usize) -> Option<&QueryRow> {
self.rows.get(index)
}
pub fn columns(&self) -> &[ColumnInfo] {
&self.metadata.columns
}
pub fn column_names(&self) -> Vec<String> {
self.metadata
.columns
.iter()
.map(|c| c.name.clone())
.collect()
}
pub fn execution_time(&self) -> u64 {
self.execution_time_ms
}
pub fn performance(&self) -> &PerformanceMetrics {
&self.metadata.performance
}
pub fn warnings(&self) -> &[String] {
&self.metadata.warnings
}
pub fn add_warning(&mut self, warning: String) {
self.metadata.warnings.push(warning);
}
pub fn to_json(&self) -> serde_json::Value {
let rows: Vec<_> = self
.rows
.iter()
.map(|row| self.row_to_json_deterministic(row))
.collect();
let columns: Vec<_> = self
.metadata
.columns
.iter()
.map(ColumnInfo::to_json)
.collect();
let warnings: Vec<_> = self
.metadata
.warnings
.iter()
.cloned()
.map(serde_json::Value::String)
.collect();
json!({
"rows": rows,
"rows_affected": self.rows_affected,
"row_count": self.rows.len(),
"columns": columns,
"performance": self.metadata.performance.to_json(),
"warnings": warnings,
})
}
pub fn iter(&self) -> std::slice::Iter<'_, QueryRow> {
self.rows.iter()
}
fn row_to_json_deterministic(&self, row: &QueryRow) -> serde_json::Value {
let mut result = serde_json::Map::new();
if !self.metadata.columns.is_empty() {
for col in &self.metadata.columns {
let value_json = row
.values
.get(&col.name)
.map_or(serde_json::Value::Null, ToJson::to_json);
result.insert(col.name.clone(), value_json);
}
} else {
let mut sorted_keys: Vec<&String> = row.values.keys().collect();
sorted_keys.sort();
for key in sorted_keys {
if let Some(value) = row.values.get(key) {
result.insert(key.clone(), value.to_json());
}
}
}
result.insert(
"_key".to_string(),
serde_json::Value::String(format!("{:?}", row.key)),
);
if row_metadata_is_populated(&row.metadata) {
result.insert("_metadata".to_string(), row.metadata.to_json());
}
serde_json::Value::Object(result)
}
}
impl QueryRow {
pub fn new(key: RowKey) -> Self {
Self {
values: HashMap::new(),
key,
metadata: RowMetadata::default(),
}
}
pub fn with_values(key: RowKey, values: HashMap<String, Value>) -> Self {
Self {
values,
key,
metadata: RowMetadata::default(),
}
}
pub fn get(&self, column: &str) -> Option<&Value> {
self.values.get(column)
}
pub fn set(&mut self, column: String, value: Value) {
self.values.insert(column, value);
}
pub fn column_names(&self) -> Vec<String> {
self.values.keys().cloned().collect()
}
pub fn key(&self) -> &RowKey {
&self.key
}
pub fn metadata(&self) -> &RowMetadata {
&self.metadata
}
pub fn set_metadata(&mut self, metadata: RowMetadata) {
self.metadata = metadata;
}
pub fn to_json(&self) -> serde_json::Value {
let mut result = serde_json::Map::new();
for (column, value) in &self.values {
result.insert(column.clone(), value.to_json());
}
result.insert(
"_key".to_string(),
serde_json::Value::String(format!("{:?}", self.key)),
);
if row_metadata_is_populated(&self.metadata) {
result.insert("_metadata".to_string(), self.metadata.to_json());
}
serde_json::Value::Object(result)
}
}
impl ColumnInfo {
pub fn new(
name: String,
data_type: crate::types::DataType,
nullable: bool,
position: usize,
) -> Self {
Self {
name,
data_type,
nullable,
position,
table_name: None,
cql_type: None,
}
}
pub fn with_table_name(mut self, table_name: String) -> Self {
self.table_name = Some(table_name);
self
}
pub fn with_cql_type(mut self, cql_type: CqlType) -> Self {
self.cql_type = Some(cql_type);
self
}
pub fn to_json(&self) -> serde_json::Value {
let mut map = serde_json::Map::new();
map.insert("name".to_string(), json!(self.name));
map.insert(
"data_type".to_string(),
json!(format!("{:?}", self.data_type)),
);
map.insert("nullable".to_string(), json!(self.nullable));
map.insert("position".to_string(), json!(self.position));
if let Some(table_name) = &self.table_name {
map.insert("table_name".to_string(), json!(table_name));
}
if let Some(cql_type) = &self.cql_type {
map.insert("cql_type".to_string(), json!(format_cql_type(cql_type)));
}
serde_json::Value::Object(map)
}
}
pub fn cql_type_to_data_type(cql_type: &CqlType) -> crate::types::DataType {
use crate::types::DataType;
match cql_type {
CqlType::Boolean => DataType::Boolean,
CqlType::TinyInt => DataType::TinyInt,
CqlType::SmallInt => DataType::SmallInt,
CqlType::Int => DataType::Integer,
CqlType::BigInt | CqlType::Varint | CqlType::Counter => DataType::BigInt,
CqlType::Float => DataType::Float32,
CqlType::Double | CqlType::Decimal => DataType::Float,
CqlType::Text | CqlType::Varchar | CqlType::Ascii => DataType::Text,
CqlType::Blob => DataType::Blob,
CqlType::Timestamp => DataType::Timestamp,
CqlType::Date | CqlType::Time | CqlType::Duration | CqlType::Inet => DataType::BigInt,
CqlType::Uuid | CqlType::TimeUuid => DataType::Uuid,
CqlType::List(_) => DataType::List,
CqlType::Set(_) => DataType::Set,
CqlType::Map(_, _) => DataType::Map,
CqlType::Tuple(_) => DataType::Tuple,
CqlType::Udt(_, _) => DataType::Udt,
CqlType::Frozen(inner) => cql_type_to_data_type(inner),
CqlType::Custom(_) => DataType::Blob,
}
}
fn format_cql_type(cql_type: &CqlType) -> String {
match cql_type {
CqlType::Boolean => "boolean".to_string(),
CqlType::TinyInt => "tinyint".to_string(),
CqlType::SmallInt => "smallint".to_string(),
CqlType::Int => "int".to_string(),
CqlType::BigInt => "bigint".to_string(),
CqlType::Counter => "counter".to_string(),
CqlType::Float => "float".to_string(),
CqlType::Double => "double".to_string(),
CqlType::Decimal => "decimal".to_string(),
CqlType::Text => "text".to_string(),
CqlType::Varchar => "varchar".to_string(),
CqlType::Ascii => "ascii".to_string(),
CqlType::Blob => "blob".to_string(),
CqlType::Timestamp => "timestamp".to_string(),
CqlType::Date => "date".to_string(),
CqlType::Time => "time".to_string(),
CqlType::Uuid => "uuid".to_string(),
CqlType::TimeUuid => "timeuuid".to_string(),
CqlType::Inet => "inet".to_string(),
CqlType::Duration => "duration".to_string(),
CqlType::Varint => "varint".to_string(),
CqlType::List(inner) => format!("list<{}>", format_cql_type(inner)),
CqlType::Set(inner) => format!("set<{}>", format_cql_type(inner)),
CqlType::Map(k, v) => format!("map<{}, {}>", format_cql_type(k), format_cql_type(v)),
CqlType::Tuple(types) => {
let inner: Vec<_> = types.iter().map(format_cql_type).collect();
format!("tuple<{}>", inner.join(", "))
}
CqlType::Udt(name, _) => name.clone(),
CqlType::Frozen(inner) => format!("frozen<{}>", format_cql_type(inner)),
CqlType::Custom(name) => name.clone(),
}
}
impl RowMetadata {
pub fn new() -> Self {
Self::default()
}
pub fn with_version(mut self, version: u64) -> Self {
self.version = Some(version);
self
}
pub fn with_ttl(mut self, ttl: u64) -> Self {
self.ttl = Some(ttl);
self
}
pub fn with_tag(mut self, key: String, value: String) -> Self {
self.tags.insert(key, value);
self
}
pub fn to_json(&self) -> serde_json::Value {
let mut map = serde_json::Map::new();
if let Some(version) = self.version {
map.insert("version".to_string(), json!(version));
}
if let Some(ttl) = self.ttl {
map.insert("ttl".to_string(), json!(ttl));
}
if !self.tags.is_empty() {
map.insert("tags".to_string(), json!(self.tags));
}
serde_json::Value::Object(map)
}
}
impl PerformanceMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn total_time_ms(&self) -> u64 {
self.total_time_us / 1000
}
pub fn cache_hit_ratio(&self) -> f64 {
let total = self.cache_hits + self.cache_misses;
if total == 0 {
0.0
} else {
self.cache_hits as f64 / total as f64
}
}
pub fn to_json(&self) -> serde_json::Value {
let cache_hit_ratio = serde_json::Number::from_f64(self.cache_hit_ratio())
.map(serde_json::Value::Number)
.unwrap_or(json!(0));
json!({
"parse_time_us": self.parse_time_us,
"planning_time_us": self.planning_time_us,
"execution_time_us": self.execution_time_us,
"total_time_us": self.total_time_us,
"memory_usage_bytes": self.memory_usage_bytes,
"io_operations": self.io_operations,
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"cache_hit_ratio": cache_hit_ratio,
})
}
}
fn write_border(
f: &mut fmt::Formatter<'_>,
widths: &[usize],
left: char,
sep: char,
right: char,
) -> fmt::Result {
write!(f, "{}", left)?;
for (i, width) in widths.iter().enumerate() {
write!(f, "{}", "─".repeat(width + 2))?;
if i < widths.len() - 1 {
write!(f, "{}", sep)?;
}
}
writeln!(f, "{}", right)
}
impl fmt::Display for QueryResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.rows.is_empty() {
return write!(f, "Empty result set ({} rows affected)", self.rows_affected);
}
let column_names = self.column_names();
if column_names.is_empty() {
return write!(f, "No columns in result set");
}
let col_widths: Vec<usize> = column_names
.iter()
.map(|col_name| {
self.rows
.iter()
.filter_map(|row| row.values.get(col_name))
.map(|v| format!("{}", v).len())
.max()
.unwrap_or(0)
.max(col_name.len())
})
.collect();
write_border(f, &col_widths, '┌', '┬', '┐')?;
write!(f, "│")?;
for (i, (col_name, width)) in column_names.iter().zip(col_widths.iter()).enumerate() {
write!(f, " {:width$} ", col_name, width = width)?;
if i < column_names.len() - 1 {
write!(f, "│")?;
}
}
writeln!(f, "│")?;
write_border(f, &col_widths, '├', '┼', '┤')?;
for row in &self.rows {
write!(f, "│")?;
for (i, (col_name, width)) in column_names.iter().zip(col_widths.iter()).enumerate() {
let value = row
.values
.get(col_name)
.map(|v| format!("{}", v))
.unwrap_or_else(|| "NULL".to_string());
write!(f, " {:width$} ", value, width = width)?;
if i < column_names.len() - 1 {
write!(f, "│")?;
}
}
writeln!(f, "│")?;
}
write_border(f, &col_widths, '└', '┴', '┘')?;
writeln!(
f,
"{} rows returned in {}ms",
self.rows.len(),
self.execution_time_ms
)?;
if !self.metadata.warnings.is_empty() {
writeln!(f, "\nWarnings:")?;
for warning in &self.metadata.warnings {
writeln!(f, " - {}", warning)?;
}
}
Ok(())
}
}
impl Default for QueryResult {
fn default() -> Self {
Self::new()
}
}
impl IntoIterator for QueryResult {
type Item = QueryRow;
type IntoIter = std::vec::IntoIter<QueryRow>;
fn into_iter(self) -> Self::IntoIter {
self.rows.into_iter()
}
}
impl<'a> IntoIterator for &'a QueryResult {
type Item = &'a QueryRow;
type IntoIter = std::slice::Iter<'a, QueryRow>;
fn into_iter(self) -> Self::IntoIter {
self.rows.iter()
}
}
trait ToJson {
fn to_json(&self) -> serde_json::Value;
}
impl ToJson for Value {
fn to_json(&self) -> serde_json::Value {
fn float_to_json(x: f64) -> serde_json::Value {
serde_json::Number::from_f64(x)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null)
}
match self {
Value::Null => serde_json::Value::Null,
Value::Boolean(b) => json!(*b),
Value::Integer(i) => json!(*i),
Value::BigInt(i) => json!(*i),
Value::Counter(c) => json!(*c),
Value::TinyInt(i) => json!(*i as i64),
Value::SmallInt(i) => json!(*i as i64),
Value::Date(d) => json!(*d),
Value::Time(t) => json!(*t),
Value::Timestamp(ts) => json!(*ts),
Value::Float(f) => float_to_json(*f),
Value::Float32(f) => float_to_json(*f as f64),
Value::Text(s) => json!(s),
Value::Json(value) => value.clone(),
Value::Blob(bytes) | Value::Varint(bytes) | Value::Inet(bytes) => json!(b64(bytes)),
Value::Uuid(uuid) => json!(b64(uuid)),
Value::List(items) | Value::Set(items) | Value::Tuple(items) => {
let json_list: Vec<_> = items.iter().map(ToJson::to_json).collect();
serde_json::Value::Array(json_list)
}
Value::Map(entries) => {
let json_map: serde_json::Map<String, serde_json::Value> = entries
.iter()
.map(|(k, v)| (format!("{}", k), v.to_json()))
.collect();
serde_json::Value::Object(json_map)
}
Value::Udt(udt) => {
let mut json_obj = serde_json::Map::new();
json_obj.insert("_type".to_string(), json!(udt.type_name));
for field in &udt.fields {
let field_json = field
.value
.as_ref()
.map_or(serde_json::Value::Null, ToJson::to_json);
json_obj.insert(field.name.clone(), field_json);
}
serde_json::Value::Object(json_obj)
}
Value::Frozen(boxed) => boxed.to_json(),
Value::Decimal { scale, unscaled } => json!({
"scale": *scale,
"unscaled": b64(unscaled),
}),
Value::Duration {
months,
days,
nanos,
} => json!({
"months": *months,
"days": *days,
"nanos": *nanos,
}),
Value::Tombstone(info) => {
let mut json_obj = serde_json::Map::new();
json_obj.insert("type".to_string(), json!("tombstone"));
json_obj.insert("deletion_time".to_string(), json!(info.deletion_time));
json_obj.insert(
"tombstone_type".to_string(),
json!(format!("{:?}", info.tombstone_type)),
);
if let Some(ttl) = info.ttl {
json_obj.insert("ttl".to_string(), json!(ttl));
}
serde_json::Value::Object(json_obj)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Value;
#[test]
fn test_query_result_creation() {
let result = QueryResult::new();
assert!(result.is_empty());
assert_eq!(result.row_count(), 0);
assert_eq!(result.execution_time(), 0);
}
#[test]
fn test_query_result_with_rows() {
let mut row1 = QueryRow::new(RowKey::new(vec![1]));
row1.set("id".to_string(), Value::Integer(1));
row1.set("name".to_string(), Value::Text("Alice".to_string()));
let mut row2 = QueryRow::new(RowKey::new(vec![2]));
row2.set("id".to_string(), Value::Integer(2));
row2.set("name".to_string(), Value::Text("Bob".to_string()));
let result = QueryResult::with_rows(vec![row1, row2]);
assert_eq!(result.row_count(), 2);
assert!(!result.is_empty());
let first_row = result.get_row(0).unwrap();
assert_eq!(first_row.get("id"), Some(&Value::Integer(1)));
assert_eq!(
first_row.get("name"),
Some(&Value::Text("Alice".to_string()))
);
}
#[test]
fn test_query_row_operations() {
let mut row = QueryRow::new(RowKey::new(vec![1]));
row.set("id".to_string(), Value::Integer(42));
row.set("active".to_string(), Value::Boolean(true));
assert_eq!(row.get("id"), Some(&Value::Integer(42)));
assert_eq!(row.get("active"), Some(&Value::Boolean(true)));
assert_eq!(row.get("nonexistent"), None);
let column_names = row.column_names();
assert_eq!(column_names.len(), 2);
assert!(column_names.contains(&"id".to_string()));
assert!(column_names.contains(&"active".to_string()));
}
#[test]
fn test_column_info() {
let column = ColumnInfo::new(
"user_id".to_string(),
crate::types::DataType::Integer,
false,
0,
)
.with_table_name("users".to_string());
assert_eq!(column.name, "user_id");
assert_eq!(column.data_type, crate::types::DataType::Integer);
assert!(!column.nullable);
assert_eq!(column.position, 0);
assert_eq!(column.table_name, Some("users".to_string()));
assert!(column.cql_type.is_none());
}
#[test]
fn test_column_info_with_cql_type_scalar() {
use crate::schema::CqlType;
let column = ColumnInfo::new("ts".to_string(), crate::types::DataType::Timestamp, true, 1)
.with_cql_type(CqlType::Timestamp);
assert_eq!(column.name, "ts");
assert_eq!(column.cql_type, Some(CqlType::Timestamp));
assert_eq!(column.data_type, crate::types::DataType::Timestamp);
}
#[test]
fn test_column_info_with_cql_type_list() {
use crate::schema::CqlType;
let list_type = CqlType::List(Box::new(CqlType::Int));
let column = ColumnInfo::new("items".to_string(), crate::types::DataType::List, true, 2)
.with_cql_type(list_type.clone());
assert_eq!(column.cql_type, Some(list_type));
}
#[test]
fn test_column_info_with_cql_type_map() {
use crate::schema::CqlType;
let map_type = CqlType::Map(Box::new(CqlType::Text), Box::new(CqlType::BigInt));
let column = ColumnInfo::new("props".to_string(), crate::types::DataType::Map, true, 3)
.with_cql_type(map_type.clone());
assert_eq!(column.cql_type, Some(map_type));
}
#[test]
fn test_column_info_with_cql_type_udt() {
use crate::schema::CqlType;
let udt_type = CqlType::Udt("address".to_string(), vec![]);
let column = ColumnInfo::new("addr".to_string(), crate::types::DataType::Udt, true, 4)
.with_cql_type(udt_type.clone());
assert_eq!(column.cql_type, Some(udt_type));
}
#[test]
fn test_cql_type_to_data_type_scalars() {
use super::cql_type_to_data_type;
use crate::schema::CqlType;
use crate::types::DataType;
assert_eq!(cql_type_to_data_type(&CqlType::Boolean), DataType::Boolean);
assert_eq!(cql_type_to_data_type(&CqlType::Int), DataType::Integer);
assert_eq!(cql_type_to_data_type(&CqlType::BigInt), DataType::BigInt);
assert_eq!(cql_type_to_data_type(&CqlType::Text), DataType::Text);
assert_eq!(cql_type_to_data_type(&CqlType::Blob), DataType::Blob);
assert_eq!(cql_type_to_data_type(&CqlType::Uuid), DataType::Uuid);
assert_eq!(
cql_type_to_data_type(&CqlType::Timestamp),
DataType::Timestamp
);
}
#[test]
fn test_cql_type_to_data_type_collections() {
use super::cql_type_to_data_type;
use crate::schema::CqlType;
use crate::types::DataType;
assert_eq!(
cql_type_to_data_type(&CqlType::List(Box::new(CqlType::Int))),
DataType::List
);
assert_eq!(
cql_type_to_data_type(&CqlType::Set(Box::new(CqlType::Text))),
DataType::Set
);
assert_eq!(
cql_type_to_data_type(&CqlType::Map(
Box::new(CqlType::Text),
Box::new(CqlType::BigInt)
)),
DataType::Map
);
}
#[test]
fn test_cql_type_to_data_type_frozen() {
use super::cql_type_to_data_type;
use crate::schema::CqlType;
use crate::types::DataType;
assert_eq!(
cql_type_to_data_type(&CqlType::Frozen(Box::new(CqlType::List(Box::new(
CqlType::Int
))))),
DataType::List
);
}
#[test]
fn test_column_info_to_json_includes_cql_type() {
use crate::schema::CqlType;
let column = ColumnInfo::new("items".to_string(), crate::types::DataType::List, true, 0)
.with_cql_type(CqlType::List(Box::new(CqlType::Int)));
let json = column.to_json();
let obj = json.as_object().unwrap();
assert_eq!(obj["name"], "items");
assert!(obj.contains_key("data_type"));
assert!(obj.contains_key("nullable"));
assert!(obj.contains_key("position"));
assert!(obj.contains_key("cql_type"));
assert_eq!(obj["cql_type"], "list<int>");
}
#[test]
fn test_column_info_to_json_no_cql_type() {
let column = ColumnInfo::new("id".to_string(), crate::types::DataType::Integer, false, 0);
let json = column.to_json();
let obj = json.as_object().unwrap();
assert!(!obj.contains_key("cql_type"));
}
#[test]
fn test_row_metadata() {
let metadata = RowMetadata::new()
.with_version(123)
.with_ttl(3600)
.with_tag("source".to_string(), "import".to_string());
assert_eq!(metadata.version, Some(123));
assert_eq!(metadata.ttl, Some(3600));
assert_eq!(metadata.tags.get("source"), Some(&"import".to_string()));
}
#[test]
fn test_performance_metrics() {
let mut metrics = PerformanceMetrics::new();
metrics.cache_hits = 8;
metrics.cache_misses = 2;
metrics.total_time_us = 5000;
assert_eq!(metrics.cache_hit_ratio(), 0.8);
assert_eq!(metrics.total_time_ms(), 5);
}
#[test]
fn test_json_serialization() {
let mut row = QueryRow::new(RowKey::new(vec![1]));
row.set("id".to_string(), Value::Integer(1));
row.set("name".to_string(), Value::Text("test".to_string()));
let json = row.to_json();
assert!(json.is_object());
let obj = json.as_object().unwrap();
assert_eq!(obj.get("id"), Some(&serde_json::Value::Number(1.into())));
assert_eq!(
obj.get("name"),
Some(&serde_json::Value::String("test".to_string()))
);
}
#[test]
fn test_result_iteration() {
let row1 = QueryRow::new(RowKey::new(vec![1]));
let row2 = QueryRow::new(RowKey::new(vec![2]));
let result = QueryResult::with_rows(vec![row1, row2]);
let mut count = 0;
for _row in &result {
count += 1;
}
assert_eq!(count, 2);
let mut count = 0;
for _row in result {
count += 1;
}
assert_eq!(count, 2);
}
}