use crate::error::Result;
use crate::storage::write_engine::mutation::{DecoratedKey, Mutation};
use std::collections::BTreeMap;
#[derive(Debug)]
pub struct Memtable {
data: BTreeMap<DecoratedKey, Vec<Mutation>>,
size_bytes: usize,
row_count: usize,
created_at: i64,
}
impl Memtable {
pub fn new() -> Self {
Self {
data: BTreeMap::new(),
size_bytes: 0,
row_count: 0,
created_at: Self::current_timestamp_micros(),
}
}
pub fn insert(&mut self, _mutation: Mutation) -> Result<()> {
Err(crate::error::Error::InvalidInput(
"Use insert_with_key() - decorated key must be provided with mutation".to_string(),
))
}
pub fn insert_with_key(&mut self, key: DecoratedKey, mutation: Mutation) -> Result<()> {
let mutation_size = Self::estimate_mutation_size(&mutation);
let mutations = self.data.entry(key).or_default();
mutations.push(mutation);
self.row_count += 1;
self.size_bytes += mutation_size;
Ok(())
}
pub fn get(&self, key: &DecoratedKey) -> Option<&[Mutation]> {
self.data.get(key).map(|v| v.as_slice())
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn size_bytes(&self) -> usize {
self.size_bytes
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn should_flush(&self, threshold_bytes: usize) -> bool {
self.size_bytes >= threshold_bytes
}
pub fn created_at(&self) -> i64 {
self.created_at
}
pub fn iter(&self) -> impl Iterator<Item = (&DecoratedKey, &[Mutation])> {
self.data.iter().map(|(k, v)| (k, v.as_slice()))
}
pub fn clear(&mut self) {
self.data.clear();
self.size_bytes = 0;
self.row_count = 0;
}
const MAX_NESTING_DEPTH: usize = 32;
fn estimate_mutation_size(mutation: &Mutation) -> usize {
let mut size = 48;
for (col_name, value) in &mutation.partition_key.columns {
size += col_name.len();
size += Self::estimate_value_size_with_depth(value, 0);
}
if let Some(ref clustering_key) = mutation.clustering_key {
for (col_name, value) in &clustering_key.columns {
size += col_name.len();
size += Self::estimate_value_size_with_depth(value, 0);
}
}
for op in &mutation.operations {
size += Self::estimate_operation_size(op);
}
size
}
fn estimate_value_size(value: &crate::types::Value) -> usize {
Self::estimate_value_size_with_depth(value, 0)
}
fn estimate_value_size_with_depth(value: &crate::types::Value, depth: usize) -> usize {
use crate::types::Value;
if depth >= Self::MAX_NESTING_DEPTH {
return 1024;
}
match value {
Value::Null => 0,
Value::Boolean(_) => 1,
Value::TinyInt(_) => 1,
Value::SmallInt(_) => 2,
Value::Integer(_) => 4,
Value::BigInt(_) | Value::Counter(_) | Value::Timestamp(_) | Value::Time(_) => 8,
Value::Float32(_) => 4,
Value::Float(_) => 8,
Value::Text(s) => s.len(),
Value::Blob(bytes) | Value::Varint(bytes) | Value::Inet(bytes) => bytes.len(),
Value::Uuid(_) => 16,
Value::Date(_) => 4,
Value::Decimal { scale: _, unscaled } => 4 + unscaled.len(),
Value::Duration { .. } => 16,
Value::List(items) => {
items
.iter()
.map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
.sum::<usize>()
+ 16
}
Value::Set(items) => {
items
.iter()
.map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
.sum::<usize>()
+ 16
}
Value::Map(entries) => {
entries
.iter()
.map(|(k, v)| {
Self::estimate_value_size_with_depth(k, depth + 1)
+ Self::estimate_value_size_with_depth(v, depth + 1)
})
.sum::<usize>()
+ 16
}
Value::Udt(udt) => {
udt.fields
.iter()
.map(|field| {
field.name.len()
+ field
.value
.as_ref()
.map_or(0, |v| Self::estimate_value_size_with_depth(v, depth + 1))
})
.sum::<usize>()
+ 16
}
Value::Tuple(items) => {
items
.iter()
.map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
.sum::<usize>()
+ 16
}
Value::Json(json) => json.to_string().len(),
Value::Frozen(inner) => Self::estimate_value_size_with_depth(inner, depth + 1) + 8,
Value::Tombstone(_) => 24, }
}
fn estimate_operation_size(
op: &crate::storage::write_engine::mutation::CellOperation,
) -> usize {
use crate::storage::write_engine::mutation::CellOperation;
match op {
CellOperation::Write { column, value } => {
column.len() + Self::estimate_value_size(value) + 8 }
CellOperation::WriteWithTtl {
column,
value,
ttl_seconds: _,
} => {
column.len() + Self::estimate_value_size(value) + 16
}
CellOperation::Delete { column } => column.len() + 8,
CellOperation::DeleteRow => 8,
}
}
fn current_timestamp_micros() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as i64
}
}
impl Default for Memtable {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::write_engine::mutation::{
CellOperation, ClusteringKey, PartitionKey, TableId,
};
use crate::types::Value;
fn create_test_mutation(
id: i32,
name: &str,
clustering_val: Option<i64>,
) -> (DecoratedKey, Mutation) {
let table_id = TableId::new("test_ks", "test_table");
let partition_key = PartitionKey::single("id", Value::Integer(id));
let key_bytes = id.to_be_bytes().to_vec();
let decorated_key = DecoratedKey::from_key_bytes(key_bytes).unwrap();
let clustering_key =
clustering_val.map(|val| ClusteringKey::single("ts", Value::BigInt(val)));
let operations = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
}];
let mutation = Mutation::new(
table_id,
partition_key,
clustering_key,
operations,
1234567890,
None,
);
(decorated_key, mutation)
}
#[test]
fn test_memtable_new() {
let memtable = Memtable::new();
assert!(memtable.is_empty());
assert_eq!(memtable.size_bytes(), 0);
assert_eq!(memtable.row_count(), 0);
assert!(memtable.created_at() > 0);
}
#[test]
fn test_memtable_insert_and_get() {
let mut memtable = Memtable::new();
let (key, mutation) = create_test_mutation(1, "Alice", None);
memtable.insert_with_key(key.clone(), mutation).unwrap();
assert!(!memtable.is_empty());
assert_eq!(memtable.row_count(), 1);
assert!(memtable.size_bytes() > 0);
let mutations = memtable.get(&key).unwrap();
assert_eq!(mutations.len(), 1);
assert_eq!(mutations[0].table.table, "test_table");
}
#[test]
fn test_memtable_multiple_mutations_same_partition() {
let mut memtable = Memtable::new();
let (key, mutation1) = create_test_mutation(1, "Alice", Some(1000));
let (_, mutation2) = create_test_mutation(1, "Alice Updated", Some(2000));
memtable.insert_with_key(key.clone(), mutation1).unwrap();
memtable.insert_with_key(key.clone(), mutation2).unwrap();
assert_eq!(memtable.row_count(), 2);
let mutations = memtable.get(&key).unwrap();
assert_eq!(mutations.len(), 2);
}
#[test]
fn test_memtable_multiple_partitions() {
let mut memtable = Memtable::new();
let (key1, mutation1) = create_test_mutation(1, "Alice", None);
let (key2, mutation2) = create_test_mutation(2, "Bob", None);
let (key3, mutation3) = create_test_mutation(3, "Charlie", None);
memtable.insert_with_key(key1, mutation1).unwrap();
memtable.insert_with_key(key2, mutation2).unwrap();
memtable.insert_with_key(key3, mutation3).unwrap();
assert_eq!(memtable.row_count(), 3);
assert!(!memtable.is_empty());
}
#[test]
fn test_memtable_token_ordering() {
let mut memtable = Memtable::new();
let (key3, mutation3) = create_test_mutation(300, "Charlie", None);
let (key1, mutation1) = create_test_mutation(100, "Alice", None);
let (key2, mutation2) = create_test_mutation(200, "Bob", None);
memtable.insert_with_key(key3.clone(), mutation3).unwrap();
memtable.insert_with_key(key1.clone(), mutation1).unwrap();
memtable.insert_with_key(key2.clone(), mutation2).unwrap();
let keys: Vec<_> = memtable.iter().map(|(k, _)| k.token).collect();
assert_eq!(keys.len(), 3);
assert!(keys.windows(2).all(|w| w[0] <= w[1]));
}
#[test]
fn test_memtable_size_tracking() {
let mut memtable = Memtable::new();
let initial_size = memtable.size_bytes();
assert_eq!(initial_size, 0);
let (key, mutation) = create_test_mutation(1, "Alice", None);
memtable.insert_with_key(key, mutation).unwrap();
assert!(memtable.size_bytes() > initial_size);
let size_after_insert = memtable.size_bytes();
let (key2, mutation2) = create_test_mutation(2, "Bob with a longer name", None);
memtable.insert_with_key(key2, mutation2).unwrap();
assert!(memtable.size_bytes() > size_after_insert);
}
#[test]
fn test_memtable_should_flush() {
let mut memtable = Memtable::new();
assert!(!memtable.should_flush(1024));
for i in 0..100 {
let (key, mutation) = create_test_mutation(i, "Test data", None);
memtable.insert_with_key(key, mutation).unwrap();
}
let current_size = memtable.size_bytes();
assert!(memtable.should_flush(current_size - 1));
assert!(!memtable.should_flush(current_size + 1000));
}
#[test]
fn test_memtable_clear() {
let mut memtable = Memtable::new();
let created_at = memtable.created_at();
let (key, mutation) = create_test_mutation(1, "Alice", None);
memtable.insert_with_key(key, mutation).unwrap();
assert!(!memtable.is_empty());
assert!(memtable.size_bytes() > 0);
assert!(memtable.row_count() > 0);
memtable.clear();
assert!(memtable.is_empty());
assert_eq!(memtable.size_bytes(), 0);
assert_eq!(memtable.row_count(), 0);
assert_eq!(memtable.created_at(), created_at); }
#[test]
fn test_memtable_iterator() {
let mut memtable = Memtable::new();
let (key1, mutation1) = create_test_mutation(1, "Alice", None);
let (key2, mutation2) = create_test_mutation(2, "Bob", None);
memtable.insert_with_key(key1.clone(), mutation1).unwrap();
memtable.insert_with_key(key2.clone(), mutation2).unwrap();
let mut count = 0;
for (key, mutations) in memtable.iter() {
assert!(!mutations.is_empty());
assert!([key1.token, key2.token].contains(&key.token));
count += 1;
}
assert_eq!(count, 2);
}
#[test]
fn test_memtable_empty_check() {
let mut memtable = Memtable::new();
assert!(memtable.is_empty());
let (key, mutation) = create_test_mutation(1, "Alice", None);
memtable.insert_with_key(key, mutation).unwrap();
assert!(!memtable.is_empty());
memtable.clear();
assert!(memtable.is_empty());
}
#[test]
fn test_memtable_size_estimates() {
let small_text = Value::Text("hi".to_string());
let large_text = Value::Text("a".repeat(1000));
let integer = Value::Integer(42);
let uuid = Value::Uuid([0u8; 16]);
assert_eq!(Memtable::estimate_value_size(&small_text), 2);
assert_eq!(Memtable::estimate_value_size(&large_text), 1000);
assert_eq!(Memtable::estimate_value_size(&integer), 4);
assert_eq!(Memtable::estimate_value_size(&uuid), 16);
}
#[test]
fn test_memtable_collection_size_estimates() {
let list = Value::List(vec![
Value::Integer(1),
Value::Integer(2),
Value::Integer(3),
]);
let size = Memtable::estimate_value_size(&list);
assert!(size >= 12);
let set = Value::Set(vec![
Value::Text("a".to_string()),
Value::Text("b".to_string()),
]);
let size = Memtable::estimate_value_size(&set);
assert!(size >= 2);
let map = Value::Map(vec![
(Value::Integer(1), Value::Text("one".to_string())),
(Value::Integer(2), Value::Text("two".to_string())),
]);
let size = Memtable::estimate_value_size(&map);
assert!(size >= 11); }
#[test]
fn test_memtable_realistic_flush_threshold() {
let mut memtable = Memtable::new();
let flush_threshold = 64 * 1024 * 1024;
for i in 0..10_000 {
let (key, mutation) = create_test_mutation(
i,
"Typical user data with moderate length name",
Some(i as i64),
);
memtable.insert_with_key(key, mutation).unwrap();
}
let final_size = memtable.size_bytes();
println!(
"10K mutations size: {} bytes ({} KB)",
final_size,
final_size / 1024
);
assert!(final_size < flush_threshold);
let avg_size = final_size / 10_000;
println!("Average mutation size: {} bytes", avg_size);
assert!(avg_size > 0);
assert!(avg_size < 10_000); }
#[test]
fn test_memtable_get_nonexistent_key() {
let memtable = Memtable::new();
let key = DecoratedKey::new(12345, vec![0, 0, 0, 99]);
assert!(memtable.get(&key).is_none());
}
#[test]
fn test_memtable_insert_deprecated_api() {
let mut memtable = Memtable::new();
let table_id = TableId::new("test_ks", "test_table");
let partition_key = PartitionKey::single("id", Value::Integer(1));
let operations = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
}];
let mutation = Mutation::new(table_id, partition_key, None, operations, 1234567890, None);
let result = memtable.insert(mutation);
assert!(result.is_err());
}
#[test]
fn test_memtable_nested_collection_depth_limit() {
let mut nested_value = Value::Integer(42);
for _ in 0..40 {
nested_value = Value::List(vec![nested_value]);
}
let size = Memtable::estimate_value_size(&nested_value);
assert!(size > 0);
assert!(
size >= 1024,
"Should use conservative estimate at max depth"
);
}
#[test]
fn test_memtable_nested_map_depth_limit() {
let mut nested_value = Value::Text("bottom".to_string());
for _ in 0..35 {
nested_value = Value::Map(vec![(Value::Integer(1), nested_value)]);
}
let size = Memtable::estimate_value_size(&nested_value);
assert!(size > 0);
assert!(
size >= 1024,
"Should use conservative estimate for deep nesting"
);
}
#[test]
fn test_memtable_nested_udt_depth_limit() {
use crate::types::{UdtField, UdtValue};
let mut nested_value = Value::Integer(1);
for i in 0..35 {
let udt = UdtValue {
type_name: format!("type_{}", i),
keyspace: "test_ks".to_string(),
fields: vec![UdtField {
name: "field".to_string(),
value: Some(nested_value),
}],
};
nested_value = Value::Udt(udt);
}
let size = Memtable::estimate_value_size(&nested_value);
assert!(size > 0);
assert!(size >= 1024);
}
#[test]
fn test_memtable_frozen_nested_depth_limit() {
let mut nested_value = Value::Integer(99);
for _ in 0..40 {
nested_value = Value::Frozen(Box::new(nested_value));
}
let size = Memtable::estimate_value_size(&nested_value);
assert!(size > 0);
assert!(size >= 1024);
}
#[test]
fn test_memtable_mixed_nested_collections() {
use crate::types::{UdtField, UdtValue};
let mut nested_value = Value::Text("base".to_string());
for i in 0..50 {
nested_value = match i % 5 {
0 => Value::List(vec![nested_value]),
1 => Value::Set(vec![nested_value]),
2 => Value::Map(vec![(Value::Integer(i), nested_value)]),
3 => Value::Tuple(vec![nested_value]),
4 => Value::Udt(UdtValue {
type_name: format!("type_{}", i),
keyspace: "test_ks".to_string(),
fields: vec![UdtField {
name: "f".to_string(),
value: Some(nested_value),
}],
}),
_ => unreachable!(),
};
}
let size = Memtable::estimate_value_size(&nested_value);
assert!(size > 0);
}
#[test]
fn test_memtable_depth_limit_exact_boundary() {
let mut nested_value = Value::Integer(1);
for _ in 0..32 {
nested_value = Value::List(vec![nested_value]);
}
let size = Memtable::estimate_value_size(&nested_value);
assert!(size > 0);
nested_value = Value::List(vec![nested_value]);
let size_over = Memtable::estimate_value_size(&nested_value);
assert!(size_over >= 1024);
}
#[test]
fn test_memtable_shallow_collections_unaffected() {
let simple_list = Value::List(vec![
Value::Integer(1),
Value::Integer(2),
Value::Integer(3),
]);
let size = Memtable::estimate_value_size(&simple_list);
assert_eq!(size, 12 + 16);
let shallow_nested =
Value::List(vec![Value::List(vec![Value::List(vec![Value::Integer(
1,
)])])]);
let size = Memtable::estimate_value_size(&shallow_nested);
assert!(size > 0);
assert!(size < 1024); }
}