1use crate::error::Result;
10use crate::storage::write_engine::mutation::{DecoratedKey, Mutation};
11use std::collections::BTreeMap;
12
13#[derive(Debug)]
18pub struct Memtable {
19 data: BTreeMap<DecoratedKey, Vec<Mutation>>,
21 size_bytes: usize,
23 row_count: usize,
25 created_at: i64,
27}
28
29impl Memtable {
30 pub fn new() -> Self {
32 Self {
33 data: BTreeMap::new(),
34 size_bytes: 0,
35 row_count: 0,
36 created_at: Self::current_timestamp_micros(),
37 }
38 }
39
40 pub fn insert(&mut self, _mutation: Mutation) -> Result<()> {
45 Err(crate::error::Error::InvalidInput(
61 "Use insert_with_key() - decorated key must be provided with mutation".to_string(),
62 ))
63 }
64
65 pub fn insert_with_key(&mut self, key: DecoratedKey, mutation: Mutation) -> Result<()> {
70 let mutation_size = Self::estimate_mutation_size(&mutation);
72
73 let mutations = self.data.entry(key).or_default();
75
76 mutations.push(mutation);
78 self.row_count += 1;
79 self.size_bytes += mutation_size;
80
81 Ok(())
82 }
83
84 pub fn get(&self, key: &DecoratedKey) -> Option<&[Mutation]> {
86 self.data.get(key).map(|v| v.as_slice())
87 }
88
89 pub fn is_empty(&self) -> bool {
91 self.data.is_empty()
92 }
93
94 pub fn size_bytes(&self) -> usize {
96 self.size_bytes
97 }
98
99 pub fn row_count(&self) -> usize {
101 self.row_count
102 }
103
104 pub fn should_flush(&self, threshold_bytes: usize) -> bool {
106 self.size_bytes >= threshold_bytes
107 }
108
109 pub fn created_at(&self) -> i64 {
111 self.created_at
112 }
113
114 pub fn iter(&self) -> impl Iterator<Item = (&DecoratedKey, &[Mutation])> {
118 self.data.iter().map(|(k, v)| (k, v.as_slice()))
119 }
120
121 pub fn clear(&mut self) {
125 self.data.clear();
126 self.size_bytes = 0;
127 self.row_count = 0;
128 }
130
131 const MAX_NESTING_DEPTH: usize = 32;
133
134 fn estimate_mutation_size(mutation: &Mutation) -> usize {
142 let mut size = 48; for (col_name, value) in &mutation.partition_key.columns {
146 size += col_name.len();
147 size += Self::estimate_value_size_with_depth(value, 0);
148 }
149
150 if let Some(ref clustering_key) = mutation.clustering_key {
152 for (col_name, value) in &clustering_key.columns {
153 size += col_name.len();
154 size += Self::estimate_value_size_with_depth(value, 0);
155 }
156 }
157
158 for op in &mutation.operations {
160 size += Self::estimate_operation_size(op);
161 }
162
163 size
164 }
165
166 fn estimate_value_size(value: &crate::types::Value) -> usize {
168 Self::estimate_value_size_with_depth(value, 0)
169 }
170
171 fn estimate_value_size_with_depth(value: &crate::types::Value, depth: usize) -> usize {
176 use crate::types::Value;
177
178 if depth >= Self::MAX_NESTING_DEPTH {
180 return 1024;
182 }
183
184 match value {
185 Value::Null => 0,
186 Value::Boolean(_) => 1,
187 Value::TinyInt(_) => 1,
188 Value::SmallInt(_) => 2,
189 Value::Integer(_) => 4,
190 Value::BigInt(_) | Value::Counter(_) | Value::Timestamp(_) | Value::Time(_) => 8,
191 Value::Float32(_) => 4,
192 Value::Float(_) => 8,
193 Value::Text(s) => s.len(),
194 Value::Blob(bytes) | Value::Varint(bytes) | Value::Inet(bytes) => bytes.len(),
195 Value::Uuid(_) => 16,
196 Value::Date(_) => 4,
197 Value::Decimal { scale: _, unscaled } => 4 + unscaled.len(),
198 Value::Duration { .. } => 16,
199 Value::List(items) => {
200 items
201 .iter()
202 .map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
203 .sum::<usize>()
204 + 16
205 }
206 Value::Set(items) => {
207 items
208 .iter()
209 .map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
210 .sum::<usize>()
211 + 16
212 }
213 Value::Map(entries) => {
214 entries
215 .iter()
216 .map(|(k, v)| {
217 Self::estimate_value_size_with_depth(k, depth + 1)
218 + Self::estimate_value_size_with_depth(v, depth + 1)
219 })
220 .sum::<usize>()
221 + 16
222 }
223 Value::Udt(udt) => {
224 udt.fields
225 .iter()
226 .map(|field| {
227 field.name.len()
228 + field
229 .value
230 .as_ref()
231 .map_or(0, |v| Self::estimate_value_size_with_depth(v, depth + 1))
232 })
233 .sum::<usize>()
234 + 16
235 }
236 Value::Tuple(items) => {
237 items
238 .iter()
239 .map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
240 .sum::<usize>()
241 + 16
242 }
243 Value::Json(json) => json.to_string().len(),
244 Value::Frozen(inner) => Self::estimate_value_size_with_depth(inner, depth + 1) + 8,
245 Value::Tombstone(_) => 24, }
247 }
248
249 fn estimate_operation_size(
251 op: &crate::storage::write_engine::mutation::CellOperation,
252 ) -> usize {
253 use crate::storage::write_engine::mutation::CellOperation;
254
255 match op {
256 CellOperation::Write { column, value } => {
257 column.len() + Self::estimate_value_size(value) + 8 }
259 CellOperation::WriteWithTtl {
260 column,
261 value,
262 ttl_seconds: _,
263 } => {
264 column.len() + Self::estimate_value_size(value) + 16
266 }
267 CellOperation::Delete { column } => column.len() + 8,
268 CellOperation::DeleteRow => 8,
269 }
270 }
271
272 fn current_timestamp_micros() -> i64 {
274 std::time::SystemTime::now()
275 .duration_since(std::time::UNIX_EPOCH)
276 .unwrap_or_default()
277 .as_micros() as i64
278 }
279}
280
281impl Default for Memtable {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::storage::write_engine::mutation::{
291 CellOperation, ClusteringKey, PartitionKey, TableId,
292 };
293 use crate::types::Value;
294
295 fn create_test_mutation(
296 id: i32,
297 name: &str,
298 clustering_val: Option<i64>,
299 ) -> (DecoratedKey, Mutation) {
300 let table_id = TableId::new("test_ks", "test_table");
301 let partition_key = PartitionKey::single("id", Value::Integer(id));
302
303 let key_bytes = id.to_be_bytes().to_vec();
305 let decorated_key = DecoratedKey::from_key_bytes(key_bytes).unwrap();
306
307 let clustering_key =
308 clustering_val.map(|val| ClusteringKey::single("ts", Value::BigInt(val)));
309
310 let operations = vec![CellOperation::Write {
311 column: "name".to_string(),
312 value: Value::Text(name.to_string()),
313 }];
314
315 let mutation = Mutation::new(
316 table_id,
317 partition_key,
318 clustering_key,
319 operations,
320 1234567890,
321 None,
322 );
323
324 (decorated_key, mutation)
325 }
326
327 #[test]
328 fn test_memtable_new() {
329 let memtable = Memtable::new();
330 assert!(memtable.is_empty());
331 assert_eq!(memtable.size_bytes(), 0);
332 assert_eq!(memtable.row_count(), 0);
333 assert!(memtable.created_at() > 0);
334 }
335
336 #[test]
337 fn test_memtable_insert_and_get() {
338 let mut memtable = Memtable::new();
339
340 let (key, mutation) = create_test_mutation(1, "Alice", None);
341 memtable.insert_with_key(key.clone(), mutation).unwrap();
342
343 assert!(!memtable.is_empty());
344 assert_eq!(memtable.row_count(), 1);
345 assert!(memtable.size_bytes() > 0);
346
347 let mutations = memtable.get(&key).unwrap();
349 assert_eq!(mutations.len(), 1);
350 assert_eq!(mutations[0].table.table, "test_table");
351 }
352
353 #[test]
354 fn test_memtable_multiple_mutations_same_partition() {
355 let mut memtable = Memtable::new();
356
357 let (key, mutation1) = create_test_mutation(1, "Alice", Some(1000));
359 let (_, mutation2) = create_test_mutation(1, "Alice Updated", Some(2000));
360
361 memtable.insert_with_key(key.clone(), mutation1).unwrap();
362 memtable.insert_with_key(key.clone(), mutation2).unwrap();
363
364 assert_eq!(memtable.row_count(), 2);
365
366 let mutations = memtable.get(&key).unwrap();
368 assert_eq!(mutations.len(), 2);
369 }
370
371 #[test]
372 fn test_memtable_multiple_partitions() {
373 let mut memtable = Memtable::new();
374
375 let (key1, mutation1) = create_test_mutation(1, "Alice", None);
376 let (key2, mutation2) = create_test_mutation(2, "Bob", None);
377 let (key3, mutation3) = create_test_mutation(3, "Charlie", None);
378
379 memtable.insert_with_key(key1, mutation1).unwrap();
380 memtable.insert_with_key(key2, mutation2).unwrap();
381 memtable.insert_with_key(key3, mutation3).unwrap();
382
383 assert_eq!(memtable.row_count(), 3);
384 assert!(!memtable.is_empty());
385 }
386
387 #[test]
388 fn test_memtable_token_ordering() {
389 let mut memtable = Memtable::new();
390
391 let (key3, mutation3) = create_test_mutation(300, "Charlie", None);
393 let (key1, mutation1) = create_test_mutation(100, "Alice", None);
394 let (key2, mutation2) = create_test_mutation(200, "Bob", None);
395
396 memtable.insert_with_key(key3.clone(), mutation3).unwrap();
397 memtable.insert_with_key(key1.clone(), mutation1).unwrap();
398 memtable.insert_with_key(key2.clone(), mutation2).unwrap();
399
400 let keys: Vec<_> = memtable.iter().map(|(k, _)| k.token).collect();
402 assert_eq!(keys.len(), 3);
403
404 assert!(keys.windows(2).all(|w| w[0] <= w[1]));
406 }
407
408 #[test]
409 fn test_memtable_size_tracking() {
410 let mut memtable = Memtable::new();
411
412 let initial_size = memtable.size_bytes();
413 assert_eq!(initial_size, 0);
414
415 let (key, mutation) = create_test_mutation(1, "Alice", None);
417 memtable.insert_with_key(key, mutation).unwrap();
418
419 assert!(memtable.size_bytes() > initial_size);
421 let size_after_insert = memtable.size_bytes();
422
423 let (key2, mutation2) = create_test_mutation(2, "Bob with a longer name", None);
425 memtable.insert_with_key(key2, mutation2).unwrap();
426
427 assert!(memtable.size_bytes() > size_after_insert);
428 }
429
430 #[test]
431 fn test_memtable_should_flush() {
432 let mut memtable = Memtable::new();
433
434 assert!(!memtable.should_flush(1024));
436
437 for i in 0..100 {
439 let (key, mutation) = create_test_mutation(i, "Test data", None);
440 memtable.insert_with_key(key, mutation).unwrap();
441 }
442
443 let current_size = memtable.size_bytes();
445 assert!(memtable.should_flush(current_size - 1));
446 assert!(!memtable.should_flush(current_size + 1000));
447 }
448
449 #[test]
450 fn test_memtable_clear() {
451 let mut memtable = Memtable::new();
452
453 let created_at = memtable.created_at();
454
455 let (key, mutation) = create_test_mutation(1, "Alice", None);
457 memtable.insert_with_key(key, mutation).unwrap();
458
459 assert!(!memtable.is_empty());
460 assert!(memtable.size_bytes() > 0);
461 assert!(memtable.row_count() > 0);
462
463 memtable.clear();
465
466 assert!(memtable.is_empty());
467 assert_eq!(memtable.size_bytes(), 0);
468 assert_eq!(memtable.row_count(), 0);
469 assert_eq!(memtable.created_at(), created_at); }
471
472 #[test]
473 fn test_memtable_iterator() {
474 let mut memtable = Memtable::new();
475
476 let (key1, mutation1) = create_test_mutation(1, "Alice", None);
478 let (key2, mutation2) = create_test_mutation(2, "Bob", None);
479
480 memtable.insert_with_key(key1.clone(), mutation1).unwrap();
481 memtable.insert_with_key(key2.clone(), mutation2).unwrap();
482
483 let mut count = 0;
485 for (key, mutations) in memtable.iter() {
486 assert!(!mutations.is_empty());
487 assert!([key1.token, key2.token].contains(&key.token));
488 count += 1;
489 }
490
491 assert_eq!(count, 2);
492 }
493
494 #[test]
495 fn test_memtable_empty_check() {
496 let mut memtable = Memtable::new();
497 assert!(memtable.is_empty());
498
499 let (key, mutation) = create_test_mutation(1, "Alice", None);
500 memtable.insert_with_key(key, mutation).unwrap();
501 assert!(!memtable.is_empty());
502
503 memtable.clear();
504 assert!(memtable.is_empty());
505 }
506
507 #[test]
508 fn test_memtable_size_estimates() {
509 let small_text = Value::Text("hi".to_string());
511 let large_text = Value::Text("a".repeat(1000));
512 let integer = Value::Integer(42);
513 let uuid = Value::Uuid([0u8; 16]);
514
515 assert_eq!(Memtable::estimate_value_size(&small_text), 2);
516 assert_eq!(Memtable::estimate_value_size(&large_text), 1000);
517 assert_eq!(Memtable::estimate_value_size(&integer), 4);
518 assert_eq!(Memtable::estimate_value_size(&uuid), 16);
519 }
520
521 #[test]
522 fn test_memtable_collection_size_estimates() {
523 let list = Value::List(vec![
525 Value::Integer(1),
526 Value::Integer(2),
527 Value::Integer(3),
528 ]);
529 let size = Memtable::estimate_value_size(&list);
530 assert!(size >= 12); let set = Value::Set(vec![
534 Value::Text("a".to_string()),
535 Value::Text("b".to_string()),
536 ]);
537 let size = Memtable::estimate_value_size(&set);
538 assert!(size >= 2); let map = Value::Map(vec![
542 (Value::Integer(1), Value::Text("one".to_string())),
543 (Value::Integer(2), Value::Text("two".to_string())),
544 ]);
545 let size = Memtable::estimate_value_size(&map);
546 assert!(size >= 11); }
548
549 #[test]
550 fn test_memtable_realistic_flush_threshold() {
551 let mut memtable = Memtable::new();
552
553 let flush_threshold = 64 * 1024 * 1024; for i in 0..10_000 {
559 let (key, mutation) = create_test_mutation(
560 i,
561 "Typical user data with moderate length name",
562 Some(i as i64),
563 );
564 memtable.insert_with_key(key, mutation).unwrap();
565 }
566
567 let final_size = memtable.size_bytes();
568 println!(
569 "10K mutations size: {} bytes ({} KB)",
570 final_size,
571 final_size / 1024
572 );
573
574 assert!(final_size < flush_threshold);
576
577 let avg_size = final_size / 10_000;
579 println!("Average mutation size: {} bytes", avg_size);
580 assert!(avg_size > 0);
581 assert!(avg_size < 10_000); }
583
584 #[test]
585 fn test_memtable_get_nonexistent_key() {
586 let memtable = Memtable::new();
587 let key = DecoratedKey::new(12345, vec![0, 0, 0, 99]);
588
589 assert!(memtable.get(&key).is_none());
590 }
591
592 #[test]
593 fn test_memtable_insert_deprecated_api() {
594 let mut memtable = Memtable::new();
595
596 let table_id = TableId::new("test_ks", "test_table");
597 let partition_key = PartitionKey::single("id", Value::Integer(1));
598 let operations = vec![CellOperation::Write {
599 column: "name".to_string(),
600 value: Value::Text("Alice".to_string()),
601 }];
602
603 let mutation = Mutation::new(table_id, partition_key, None, operations, 1234567890, None);
604
605 let result = memtable.insert(mutation);
607 assert!(result.is_err());
608 }
609
610 #[test]
611 fn test_memtable_nested_collection_depth_limit() {
612 let mut nested_value = Value::Integer(42);
614 for _ in 0..40 {
615 nested_value = Value::List(vec![nested_value]);
616 }
617
618 let size = Memtable::estimate_value_size(&nested_value);
620
621 assert!(size > 0);
624 assert!(
625 size >= 1024,
626 "Should use conservative estimate at max depth"
627 );
628 }
629
630 #[test]
631 fn test_memtable_nested_map_depth_limit() {
632 let mut nested_value = Value::Text("bottom".to_string());
634 for _ in 0..35 {
635 nested_value = Value::Map(vec![(Value::Integer(1), nested_value)]);
636 }
637
638 let size = Memtable::estimate_value_size(&nested_value);
640 assert!(size > 0);
641 assert!(
642 size >= 1024,
643 "Should use conservative estimate for deep nesting"
644 );
645 }
646
647 #[test]
648 fn test_memtable_nested_udt_depth_limit() {
649 use crate::types::{UdtField, UdtValue};
650
651 let mut nested_value = Value::Integer(1);
653 for i in 0..35 {
654 let udt = UdtValue {
655 type_name: format!("type_{}", i),
656 keyspace: "test_ks".to_string(),
657 fields: vec![UdtField {
658 name: "field".to_string(),
659 value: Some(nested_value),
660 }],
661 };
662 nested_value = Value::Udt(udt);
663 }
664
665 let size = Memtable::estimate_value_size(&nested_value);
667 assert!(size > 0);
668 assert!(size >= 1024);
669 }
670
671 #[test]
672 fn test_memtable_frozen_nested_depth_limit() {
673 let mut nested_value = Value::Integer(99);
675 for _ in 0..40 {
676 nested_value = Value::Frozen(Box::new(nested_value));
677 }
678
679 let size = Memtable::estimate_value_size(&nested_value);
681 assert!(size > 0);
682 assert!(size >= 1024);
683 }
684
685 #[test]
686 fn test_memtable_mixed_nested_collections() {
687 use crate::types::{UdtField, UdtValue};
688
689 let mut nested_value = Value::Text("base".to_string());
691
692 for i in 0..50 {
694 nested_value = match i % 5 {
695 0 => Value::List(vec![nested_value]),
696 1 => Value::Set(vec![nested_value]),
697 2 => Value::Map(vec![(Value::Integer(i), nested_value)]),
698 3 => Value::Tuple(vec![nested_value]),
699 4 => Value::Udt(UdtValue {
700 type_name: format!("type_{}", i),
701 keyspace: "test_ks".to_string(),
702 fields: vec![UdtField {
703 name: "f".to_string(),
704 value: Some(nested_value),
705 }],
706 }),
707 _ => unreachable!(),
708 };
709 }
710
711 let size = Memtable::estimate_value_size(&nested_value);
713 assert!(size > 0);
714 }
715
716 #[test]
717 fn test_memtable_depth_limit_exact_boundary() {
718 let mut nested_value = Value::Integer(1);
720 for _ in 0..32 {
721 nested_value = Value::List(vec![nested_value]);
722 }
723
724 let size = Memtable::estimate_value_size(&nested_value);
726 assert!(size > 0);
727
728 nested_value = Value::List(vec![nested_value]);
730 let size_over = Memtable::estimate_value_size(&nested_value);
731
732 assert!(size_over >= 1024);
734 }
735
736 #[test]
737 fn test_memtable_shallow_collections_unaffected() {
738 let simple_list = Value::List(vec![
742 Value::Integer(1),
743 Value::Integer(2),
744 Value::Integer(3),
745 ]);
746 let size = Memtable::estimate_value_size(&simple_list);
747 assert_eq!(size, 12 + 16); let shallow_nested =
751 Value::List(vec![Value::List(vec![Value::List(vec![Value::Integer(
752 1,
753 )])])]);
754 let size = Memtable::estimate_value_size(&shallow_nested);
755 assert!(size > 0);
756 assert!(size < 1024); }
758}