use super::types::{parse_list, parse_map, parse_set, CqlTypeId};
use super::vint::encode_vint;
use super::*;
use crate::schema::{CqlType, TableSchema};
use crate::types::Value;
#[cfg(test)]
mod cassandra_format_tests {
use super::*;
#[test]
fn test_collections_table_schema_compatibility() {
let schema_json = r#"{
"keyspace": "test_keyspace",
"table": "collections_table",
"partition_keys": [
{"name": "id", "type": "uuid", "position": 0}
],
"clustering_keys": [],
"columns": [
{"name": "id", "type": "uuid", "nullable": false},
{"name": "list_col", "type": "list<text>", "nullable": true},
{"name": "set_col", "type": "set<int>", "nullable": true},
{"name": "map_col", "type": "map<text, int>", "nullable": true},
{"name": "frozen_list", "type": "frozen<list<text>>", "nullable": true},
{"name": "frozen_set", "type": "frozen<set<int>>", "nullable": true},
{"name": "frozen_map", "type": "frozen<map<text, int>>", "nullable": true}
]
}"#;
let _schema = TableSchema::from_json(schema_json).unwrap();
let list_col = _schema
.columns
.iter()
.find(|c| c.name == "list_col")
.unwrap();
assert_eq!(list_col.data_type, "list<text>");
let set_col = _schema
.columns
.iter()
.find(|c| c.name == "set_col")
.unwrap();
assert_eq!(set_col.data_type, "set<int>");
let map_col = _schema
.columns
.iter()
.find(|c| c.name == "map_col")
.unwrap();
assert_eq!(map_col.data_type, "map<text, int>");
}
#[test]
fn test_list_text_parsing_cassandra_format() {
let test_strings = vec!["hello", "world", "cassandra"];
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(test_strings.len() as i64));
data.push(CqlTypeId::Varchar as u8);
for s in &test_strings {
data.extend_from_slice(&encode_vint(s.len() as i64));
data.extend_from_slice(s.as_bytes());
}
let (remaining, value) = parse_list(&data).unwrap();
assert!(remaining.is_empty());
if let Value::List(parsed_list) = value {
assert_eq!(parsed_list.len(), 3);
for (i, item) in parsed_list.iter().enumerate() {
if let Value::Text(text) = item {
assert_eq!(text, test_strings[i]);
} else {
panic!("Expected text value at index {}", i);
}
}
} else {
panic!("Expected list value");
}
}
#[test]
fn test_set_int_parsing_cassandra_format() {
let test_ints = vec![1i32, 42, 100, -5];
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(test_ints.len() as i64));
data.push(CqlTypeId::Int as u8);
for &i in &test_ints {
data.extend_from_slice(&encode_vint(4)); data.extend_from_slice(&i.to_be_bytes());
}
let (remaining, value) = parse_set(&data).unwrap();
assert!(remaining.is_empty());
if let Value::Set(parsed_set) = value {
assert_eq!(parsed_set.len(), 4);
for (i, item) in parsed_set.iter().enumerate() {
if let Value::Integer(int_val) = item {
assert_eq!(*int_val, test_ints[i]);
} else {
panic!("Expected integer value at index {}", i);
}
}
} else {
panic!("Expected set value");
}
}
#[test]
fn test_map_text_int_parsing_cassandra_format() {
let test_pairs = vec![("key1", 10i32), ("key2", 20i32), ("key3", 30i32)];
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(test_pairs.len() as i64));
data.push(CqlTypeId::Varchar as u8);
data.push(CqlTypeId::Int as u8);
for (key, value) in &test_pairs {
data.extend_from_slice(&encode_vint(key.len() as i64));
data.extend_from_slice(key.as_bytes());
data.extend_from_slice(&encode_vint(4)); data.extend_from_slice(&value.to_be_bytes());
}
let (remaining, value) = parse_map(&data).unwrap();
assert!(remaining.is_empty());
if let Value::Map(parsed_map) = value {
assert_eq!(parsed_map.len(), 3);
for (i, (key, value)) in parsed_map.iter().enumerate() {
if let (Value::Text(key_text), Value::Integer(value_int)) = (key, value) {
assert_eq!(key_text, test_pairs[i].0);
assert_eq!(*value_int, test_pairs[i].1);
} else {
panic!("Expected text key and integer value at index {}", i);
}
}
} else {
panic!("Expected map value");
}
}
#[test]
fn test_memory_safety_large_collections() {
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(2_000_000)); data.push(CqlTypeId::Int as u8);
let result = parse_list(&data);
assert!(result.is_err(), "Should reject lists with > 1M elements");
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(2_000_000)); data.push(CqlTypeId::Varchar as u8);
data.push(CqlTypeId::Int as u8);
let result = parse_map(&data);
assert!(result.is_err(), "Should reject maps with > 1M elements");
}
#[test]
fn test_empty_collections_edge_cases() {
let empty_count_data = encode_vint(0);
let (remaining, value) = parse_list(&empty_count_data).unwrap();
assert!(remaining.is_empty());
assert_eq!(value, Value::List(Vec::new()));
let (remaining, value) = parse_set(&empty_count_data).unwrap();
assert!(remaining.is_empty());
assert_eq!(value, Value::Set(Vec::new()));
let (remaining, value) = parse_map(&empty_count_data).unwrap();
assert!(remaining.is_empty());
assert_eq!(value, Value::Map(Vec::new()));
}
#[test]
fn test_collection_type_identification() {
assert_eq!(CqlTypeId::try_from(0x20).unwrap(), CqlTypeId::List);
assert_eq!(CqlTypeId::try_from(0x21).unwrap(), CqlTypeId::Map);
assert_eq!(CqlTypeId::try_from(0x22).unwrap(), CqlTypeId::Set);
}
#[test]
fn test_mixed_type_validation() {
let int_list = Value::List(vec![
Value::Integer(1),
Value::Integer(2),
Value::Integer(3),
]);
assert!(int_list.validate_collection_types().is_ok());
if let CqlType::List(element_type) = int_list.data_type() {
assert_eq!(*element_type, CqlType::Int);
} else {
panic!("Expected List type");
}
}
#[test]
fn test_sstable_compatibility_format() {
let mut sstable_data = Vec::new();
let collection_content = {
let mut content = Vec::new();
content.extend_from_slice(&encode_vint(2)); content.push(CqlTypeId::Varchar as u8); content.extend_from_slice(&encode_vint(5)); content.extend_from_slice(b"hello");
content.extend_from_slice(&encode_vint(5)); content.extend_from_slice(b"world");
content
};
sstable_data.extend_from_slice(&encode_vint(collection_content.len() as i64));
sstable_data.extend_from_slice(&collection_content);
let (remaining, collection_length) = super::vint::parse_vint_length(&sstable_data).unwrap();
let (remaining, collection_data) =
nom::bytes::complete::take::<_, _, nom::error::Error<_>>(collection_length)(remaining)
.unwrap();
let (_, value) = parse_list(collection_data).unwrap();
if let Value::List(elements) = value {
assert_eq!(elements.len(), 2);
assert_eq!(elements[0], Value::Text("hello".to_string()));
assert_eq!(elements[1], Value::Text("world".to_string()));
} else {
panic!("Expected list value");
}
assert!(remaining.is_empty());
}
}
#[cfg(test)]
mod performance_tests {
use super::*;
use crate::parser::vint::encode_vint;
#[test]
fn test_collection_parsing_performance() {
let start = std::time::Instant::now();
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(1000));
data.push(CqlTypeId::Int as u8);
for i in 0..1000i32 {
data.extend_from_slice(&encode_vint(4));
data.extend_from_slice(&i.to_be_bytes());
}
let (_, value) = parse_list(&data).unwrap();
let duration = start.elapsed();
assert!(
duration.as_millis() < 100,
"Parsing 1000 elements should take < 100ms, took {:?}",
duration
);
if let Value::List(elements) = value {
assert_eq!(elements.len(), 1000);
assert_eq!(elements[0], Value::Integer(0));
assert_eq!(elements[500], Value::Integer(500));
assert_eq!(elements[999], Value::Integer(999));
} else {
panic!("Expected list value");
}
}
}