Skip to main content

cqlite_core/storage/write_engine/
memtable.rs

1//! In-memory write buffer (memtable)
2//!
3//! Stores mutations in memory using a BTreeMap for partition and clustering ordering.
4//! Flushes to L0 SSTable when size threshold is reached.
5//!
6//! The memtable maintains mutations in token-sorted order (via DecoratedKey) and tracks
7//! approximate memory usage to trigger flushes at a configurable threshold.
8
9use crate::error::Result;
10use crate::storage::write_engine::mutation::{DecoratedKey, Mutation};
11use std::collections::BTreeMap;
12
13/// In-memory write buffer
14///
15/// Stores mutations in memory with token-based ordering. Each partition can have
16/// multiple mutations (e.g., multiple rows with different clustering keys).
17#[derive(Debug)]
18pub struct Memtable {
19    /// Partition-level storage: token-ordered map of mutations
20    data: BTreeMap<DecoratedKey, Vec<Mutation>>,
21    /// Approximate size in bytes
22    size_bytes: usize,
23    /// Approximate row count (total mutations across all partitions)
24    row_count: usize,
25    /// Creation timestamp (Unix epoch microseconds)
26    created_at: i64,
27}
28
29impl Memtable {
30    /// Create a new memtable
31    pub fn new() -> Self {
32        Self {
33            data: BTreeMap::new(),
34            size_bytes: 0,
35            row_count: 0,
36            created_at: Self::current_timestamp_micros(),
37        }
38    }
39
40    /// Insert a mutation into the memtable
41    ///
42    /// Mutations are grouped by partition key (DecoratedKey). Multiple mutations
43    /// for the same partition are stored as a vector.
44    pub fn insert(&mut self, _mutation: Mutation) -> Result<()> {
45        // Calculate decorated key from partition key
46        // Note: This requires schema, but mutation doesn't store it.
47        // For now, we expect the mutation to be pre-validated and the key
48        // to be extractable. In practice, the caller will need to provide
49        // the decorated key or schema context.
50        //
51        // WORKAROUND: Since Mutation doesn't store DecoratedKey directly,
52        // and calculating it requires schema, we need to pass the key separately.
53        // For Issue #362, we'll implement a public API that accepts DecoratedKey.
54        //
55        // This is a design limitation that will be addressed in the full WriteEngine.
56        // For now, insert_with_key() is the primary API.
57
58        // This method is kept for API compatibility but requires rethinking.
59        // We'll implement the core logic in insert_with_key() below.
60        Err(crate::error::Error::InvalidInput(
61            "Use insert_with_key() - decorated key must be provided with mutation".to_string(),
62        ))
63    }
64
65    /// Insert a mutation with an explicit decorated key
66    ///
67    /// This is the primary insertion API. The caller is responsible for computing
68    /// the decorated key from the partition key using the table schema.
69    pub fn insert_with_key(&mut self, key: DecoratedKey, mutation: Mutation) -> Result<()> {
70        // Calculate mutation size (conservative estimate)
71        let mutation_size = Self::estimate_mutation_size(&mutation);
72
73        // Get or create mutation list for this partition
74        let mutations = self.data.entry(key).or_default();
75
76        // Add mutation
77        mutations.push(mutation);
78        self.row_count += 1;
79        self.size_bytes += mutation_size;
80
81        Ok(())
82    }
83
84    /// Get all mutations for a given partition key
85    pub fn get(&self, key: &DecoratedKey) -> Option<&[Mutation]> {
86        self.data.get(key).map(|v| v.as_slice())
87    }
88
89    /// Check if memtable is empty
90    pub fn is_empty(&self) -> bool {
91        self.data.is_empty()
92    }
93
94    /// Get current size in bytes (approximate)
95    pub fn size_bytes(&self) -> usize {
96        self.size_bytes
97    }
98
99    /// Get approximate row count
100    pub fn row_count(&self) -> usize {
101        self.row_count
102    }
103
104    /// Check if memtable should be flushed
105    pub fn should_flush(&self, threshold_bytes: usize) -> bool {
106        self.size_bytes >= threshold_bytes
107    }
108
109    /// Get creation timestamp (microseconds since Unix epoch)
110    pub fn created_at(&self) -> i64 {
111        self.created_at
112    }
113
114    /// Iterate over all partitions and their mutations
115    ///
116    /// Returns an iterator over (DecoratedKey, mutations) pairs in token order.
117    pub fn iter(&self) -> impl Iterator<Item = (&DecoratedKey, &[Mutation])> {
118        self.data.iter().map(|(k, v)| (k, v.as_slice()))
119    }
120
121    /// Clear all data from the memtable
122    ///
123    /// Used after successful flush to SSTable.
124    pub fn clear(&mut self) {
125        self.data.clear();
126        self.size_bytes = 0;
127        self.row_count = 0;
128        // Keep created_at unchanged - represents original creation time
129    }
130
131    /// Maximum nesting depth for collection size estimation (prevents stack overflow)
132    const MAX_NESTING_DEPTH: usize = 32;
133
134    /// Estimate the size of a mutation in bytes
135    ///
136    /// Conservative estimate includes:
137    /// - Fixed overhead per mutation (48 bytes for struct fields)
138    /// - Partition key size (key bytes)
139    /// - Clustering key size (if present)
140    /// - Cell operation sizes (column names + values)
141    fn estimate_mutation_size(mutation: &Mutation) -> usize {
142        let mut size = 48; // Base struct overhead
143
144        // Partition key size
145        for (col_name, value) in &mutation.partition_key.columns {
146            size += col_name.len();
147            size += Self::estimate_value_size_with_depth(value, 0);
148        }
149
150        // Clustering key size
151        if let Some(ref clustering_key) = mutation.clustering_key {
152            for (col_name, value) in &clustering_key.columns {
153                size += col_name.len();
154                size += Self::estimate_value_size_with_depth(value, 0);
155            }
156        }
157
158        // Cell operations
159        for op in &mutation.operations {
160            size += Self::estimate_operation_size(op);
161        }
162
163        size
164    }
165
166    /// Estimate the size of a CQL value in bytes
167    fn estimate_value_size(value: &crate::types::Value) -> usize {
168        Self::estimate_value_size_with_depth(value, 0)
169    }
170
171    /// Estimate the size of a CQL value in bytes with depth tracking
172    ///
173    /// Limits recursion depth to prevent stack overflow from deeply nested collections.
174    /// When max depth is reached, returns a conservative fixed estimate.
175    fn estimate_value_size_with_depth(value: &crate::types::Value, depth: usize) -> usize {
176        use crate::types::Value;
177
178        // Prevent excessive recursion for deeply nested structures
179        if depth >= Self::MAX_NESTING_DEPTH {
180            // Return conservative estimate: assume 1KB for deeply nested value
181            return 1024;
182        }
183
184        match value {
185            Value::Null => 0,
186            Value::Boolean(_) => 1,
187            Value::TinyInt(_) => 1,
188            Value::SmallInt(_) => 2,
189            Value::Integer(_) => 4,
190            Value::BigInt(_) | Value::Counter(_) | Value::Timestamp(_) | Value::Time(_) => 8,
191            Value::Float32(_) => 4,
192            Value::Float(_) => 8,
193            Value::Text(s) => s.len(),
194            Value::Blob(bytes) | Value::Varint(bytes) | Value::Inet(bytes) => bytes.len(),
195            Value::Uuid(_) => 16,
196            Value::Date(_) => 4,
197            Value::Decimal { scale: _, unscaled } => 4 + unscaled.len(),
198            Value::Duration { .. } => 16,
199            Value::List(items) => {
200                items
201                    .iter()
202                    .map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
203                    .sum::<usize>()
204                    + 16
205            }
206            Value::Set(items) => {
207                items
208                    .iter()
209                    .map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
210                    .sum::<usize>()
211                    + 16
212            }
213            Value::Map(entries) => {
214                entries
215                    .iter()
216                    .map(|(k, v)| {
217                        Self::estimate_value_size_with_depth(k, depth + 1)
218                            + Self::estimate_value_size_with_depth(v, depth + 1)
219                    })
220                    .sum::<usize>()
221                    + 16
222            }
223            Value::Udt(udt) => {
224                udt.fields
225                    .iter()
226                    .map(|field| {
227                        field.name.len()
228                            + field
229                                .value
230                                .as_ref()
231                                .map_or(0, |v| Self::estimate_value_size_with_depth(v, depth + 1))
232                    })
233                    .sum::<usize>()
234                    + 16
235            }
236            Value::Tuple(items) => {
237                items
238                    .iter()
239                    .map(|item| Self::estimate_value_size_with_depth(item, depth + 1))
240                    .sum::<usize>()
241                    + 16
242            }
243            Value::Json(json) => json.to_string().len(),
244            Value::Frozen(inner) => Self::estimate_value_size_with_depth(inner, depth + 1) + 8,
245            Value::Tombstone(_) => 24, // timestamp + type + ttl overhead
246        }
247    }
248
249    /// Estimate the size of a cell operation
250    fn estimate_operation_size(
251        op: &crate::storage::write_engine::mutation::CellOperation,
252    ) -> usize {
253        use crate::storage::write_engine::mutation::CellOperation;
254
255        match op {
256            CellOperation::Write { column, value } => {
257                column.len() + Self::estimate_value_size(value) + 8 // +8 for overhead
258            }
259            CellOperation::WriteWithTtl {
260                column,
261                value,
262                ttl_seconds: _,
263            } => {
264                // TTL cells: same as Write + 4 bytes for TTL + 4 bytes for local_deletion_time
265                column.len() + Self::estimate_value_size(value) + 16
266            }
267            CellOperation::Delete { column } => column.len() + 8,
268            CellOperation::DeleteRow => 8,
269        }
270    }
271
272    /// Get current timestamp in microseconds since Unix epoch
273    fn current_timestamp_micros() -> i64 {
274        std::time::SystemTime::now()
275            .duration_since(std::time::UNIX_EPOCH)
276            .unwrap_or_default()
277            .as_micros() as i64
278    }
279}
280
281impl Default for Memtable {
282    fn default() -> Self {
283        Self::new()
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::storage::write_engine::mutation::{
291        CellOperation, ClusteringKey, PartitionKey, TableId,
292    };
293    use crate::types::Value;
294
295    fn create_test_mutation(
296        id: i32,
297        name: &str,
298        clustering_val: Option<i64>,
299    ) -> (DecoratedKey, Mutation) {
300        let table_id = TableId::new("test_ks", "test_table");
301        let partition_key = PartitionKey::single("id", Value::Integer(id));
302
303        // Calculate decorated key from partition key bytes
304        let key_bytes = id.to_be_bytes().to_vec();
305        let decorated_key = DecoratedKey::from_key_bytes(key_bytes).unwrap();
306
307        let clustering_key =
308            clustering_val.map(|val| ClusteringKey::single("ts", Value::BigInt(val)));
309
310        let operations = vec![CellOperation::Write {
311            column: "name".to_string(),
312            value: Value::Text(name.to_string()),
313        }];
314
315        let mutation = Mutation::new(
316            table_id,
317            partition_key,
318            clustering_key,
319            operations,
320            1234567890,
321            None,
322        );
323
324        (decorated_key, mutation)
325    }
326
327    #[test]
328    fn test_memtable_new() {
329        let memtable = Memtable::new();
330        assert!(memtable.is_empty());
331        assert_eq!(memtable.size_bytes(), 0);
332        assert_eq!(memtable.row_count(), 0);
333        assert!(memtable.created_at() > 0);
334    }
335
336    #[test]
337    fn test_memtable_insert_and_get() {
338        let mut memtable = Memtable::new();
339
340        let (key, mutation) = create_test_mutation(1, "Alice", None);
341        memtable.insert_with_key(key.clone(), mutation).unwrap();
342
343        assert!(!memtable.is_empty());
344        assert_eq!(memtable.row_count(), 1);
345        assert!(memtable.size_bytes() > 0);
346
347        // Retrieve mutation
348        let mutations = memtable.get(&key).unwrap();
349        assert_eq!(mutations.len(), 1);
350        assert_eq!(mutations[0].table.table, "test_table");
351    }
352
353    #[test]
354    fn test_memtable_multiple_mutations_same_partition() {
355        let mut memtable = Memtable::new();
356
357        // Insert multiple mutations for same partition (different clustering keys)
358        let (key, mutation1) = create_test_mutation(1, "Alice", Some(1000));
359        let (_, mutation2) = create_test_mutation(1, "Alice Updated", Some(2000));
360
361        memtable.insert_with_key(key.clone(), mutation1).unwrap();
362        memtable.insert_with_key(key.clone(), mutation2).unwrap();
363
364        assert_eq!(memtable.row_count(), 2);
365
366        // Both mutations should be stored for this partition
367        let mutations = memtable.get(&key).unwrap();
368        assert_eq!(mutations.len(), 2);
369    }
370
371    #[test]
372    fn test_memtable_multiple_partitions() {
373        let mut memtable = Memtable::new();
374
375        let (key1, mutation1) = create_test_mutation(1, "Alice", None);
376        let (key2, mutation2) = create_test_mutation(2, "Bob", None);
377        let (key3, mutation3) = create_test_mutation(3, "Charlie", None);
378
379        memtable.insert_with_key(key1, mutation1).unwrap();
380        memtable.insert_with_key(key2, mutation2).unwrap();
381        memtable.insert_with_key(key3, mutation3).unwrap();
382
383        assert_eq!(memtable.row_count(), 3);
384        assert!(!memtable.is_empty());
385    }
386
387    #[test]
388    fn test_memtable_token_ordering() {
389        let mut memtable = Memtable::new();
390
391        // Insert in non-sorted order
392        let (key3, mutation3) = create_test_mutation(300, "Charlie", None);
393        let (key1, mutation1) = create_test_mutation(100, "Alice", None);
394        let (key2, mutation2) = create_test_mutation(200, "Bob", None);
395
396        memtable.insert_with_key(key3.clone(), mutation3).unwrap();
397        memtable.insert_with_key(key1.clone(), mutation1).unwrap();
398        memtable.insert_with_key(key2.clone(), mutation2).unwrap();
399
400        // Verify iteration returns partitions in token order
401        let keys: Vec<_> = memtable.iter().map(|(k, _)| k.token).collect();
402        assert_eq!(keys.len(), 3);
403
404        // Keys should be sorted by token
405        assert!(keys.windows(2).all(|w| w[0] <= w[1]));
406    }
407
408    #[test]
409    fn test_memtable_size_tracking() {
410        let mut memtable = Memtable::new();
411
412        let initial_size = memtable.size_bytes();
413        assert_eq!(initial_size, 0);
414
415        // Insert mutation
416        let (key, mutation) = create_test_mutation(1, "Alice", None);
417        memtable.insert_with_key(key, mutation).unwrap();
418
419        // Size should increase
420        assert!(memtable.size_bytes() > initial_size);
421        let size_after_insert = memtable.size_bytes();
422
423        // Insert another mutation - size should increase more
424        let (key2, mutation2) = create_test_mutation(2, "Bob with a longer name", None);
425        memtable.insert_with_key(key2, mutation2).unwrap();
426
427        assert!(memtable.size_bytes() > size_after_insert);
428    }
429
430    #[test]
431    fn test_memtable_should_flush() {
432        let mut memtable = Memtable::new();
433
434        // Should not flush when empty
435        assert!(!memtable.should_flush(1024));
436
437        // Insert mutations until threshold
438        for i in 0..100 {
439            let (key, mutation) = create_test_mutation(i, "Test data", None);
440            memtable.insert_with_key(key, mutation).unwrap();
441        }
442
443        // Should flush if size exceeds threshold
444        let current_size = memtable.size_bytes();
445        assert!(memtable.should_flush(current_size - 1));
446        assert!(!memtable.should_flush(current_size + 1000));
447    }
448
449    #[test]
450    fn test_memtable_clear() {
451        let mut memtable = Memtable::new();
452
453        let created_at = memtable.created_at();
454
455        // Insert some data
456        let (key, mutation) = create_test_mutation(1, "Alice", None);
457        memtable.insert_with_key(key, mutation).unwrap();
458
459        assert!(!memtable.is_empty());
460        assert!(memtable.size_bytes() > 0);
461        assert!(memtable.row_count() > 0);
462
463        // Clear
464        memtable.clear();
465
466        assert!(memtable.is_empty());
467        assert_eq!(memtable.size_bytes(), 0);
468        assert_eq!(memtable.row_count(), 0);
469        assert_eq!(memtable.created_at(), created_at); // Timestamp unchanged
470    }
471
472    #[test]
473    fn test_memtable_iterator() {
474        let mut memtable = Memtable::new();
475
476        // Insert multiple partitions
477        let (key1, mutation1) = create_test_mutation(1, "Alice", None);
478        let (key2, mutation2) = create_test_mutation(2, "Bob", None);
479
480        memtable.insert_with_key(key1.clone(), mutation1).unwrap();
481        memtable.insert_with_key(key2.clone(), mutation2).unwrap();
482
483        // Iterate and verify
484        let mut count = 0;
485        for (key, mutations) in memtable.iter() {
486            assert!(!mutations.is_empty());
487            assert!([key1.token, key2.token].contains(&key.token));
488            count += 1;
489        }
490
491        assert_eq!(count, 2);
492    }
493
494    #[test]
495    fn test_memtable_empty_check() {
496        let mut memtable = Memtable::new();
497        assert!(memtable.is_empty());
498
499        let (key, mutation) = create_test_mutation(1, "Alice", None);
500        memtable.insert_with_key(key, mutation).unwrap();
501        assert!(!memtable.is_empty());
502
503        memtable.clear();
504        assert!(memtable.is_empty());
505    }
506
507    #[test]
508    fn test_memtable_size_estimates() {
509        // Test size estimation for different value types
510        let small_text = Value::Text("hi".to_string());
511        let large_text = Value::Text("a".repeat(1000));
512        let integer = Value::Integer(42);
513        let uuid = Value::Uuid([0u8; 16]);
514
515        assert_eq!(Memtable::estimate_value_size(&small_text), 2);
516        assert_eq!(Memtable::estimate_value_size(&large_text), 1000);
517        assert_eq!(Memtable::estimate_value_size(&integer), 4);
518        assert_eq!(Memtable::estimate_value_size(&uuid), 16);
519    }
520
521    #[test]
522    fn test_memtable_collection_size_estimates() {
523        // List
524        let list = Value::List(vec![
525            Value::Integer(1),
526            Value::Integer(2),
527            Value::Integer(3),
528        ]);
529        let size = Memtable::estimate_value_size(&list);
530        assert!(size >= 12); // 3 * 4 bytes + overhead
531
532        // Set
533        let set = Value::Set(vec![
534            Value::Text("a".to_string()),
535            Value::Text("b".to_string()),
536        ]);
537        let size = Memtable::estimate_value_size(&set);
538        assert!(size >= 2); // 2 * 1 byte + overhead
539
540        // Map
541        let map = Value::Map(vec![
542            (Value::Integer(1), Value::Text("one".to_string())),
543            (Value::Integer(2), Value::Text("two".to_string())),
544        ]);
545        let size = Memtable::estimate_value_size(&map);
546        assert!(size >= 11); // 2 * (4 + 3) bytes + overhead
547    }
548
549    #[test]
550    fn test_memtable_realistic_flush_threshold() {
551        let mut memtable = Memtable::new();
552
553        // Target: ~10K mutations before 64MB flush (conservative estimate)
554        // Average mutation size should be < 6.4KB
555        let flush_threshold = 64 * 1024 * 1024; // 64MB
556
557        // Insert 10K typical mutations
558        for i in 0..10_000 {
559            let (key, mutation) = create_test_mutation(
560                i,
561                "Typical user data with moderate length name",
562                Some(i as i64),
563            );
564            memtable.insert_with_key(key, mutation).unwrap();
565        }
566
567        let final_size = memtable.size_bytes();
568        println!(
569            "10K mutations size: {} bytes ({} KB)",
570            final_size,
571            final_size / 1024
572        );
573
574        // Should be well under 64MB for 10K mutations
575        assert!(final_size < flush_threshold);
576
577        // Verify avg size per mutation is reasonable
578        let avg_size = final_size / 10_000;
579        println!("Average mutation size: {} bytes", avg_size);
580        assert!(avg_size > 0);
581        assert!(avg_size < 10_000); // Should be less than 10KB per mutation
582    }
583
584    #[test]
585    fn test_memtable_get_nonexistent_key() {
586        let memtable = Memtable::new();
587        let key = DecoratedKey::new(12345, vec![0, 0, 0, 99]);
588
589        assert!(memtable.get(&key).is_none());
590    }
591
592    #[test]
593    fn test_memtable_insert_deprecated_api() {
594        let mut memtable = Memtable::new();
595
596        let table_id = TableId::new("test_ks", "test_table");
597        let partition_key = PartitionKey::single("id", Value::Integer(1));
598        let operations = vec![CellOperation::Write {
599            column: "name".to_string(),
600            value: Value::Text("Alice".to_string()),
601        }];
602
603        let mutation = Mutation::new(table_id, partition_key, None, operations, 1234567890, None);
604
605        // Deprecated insert() should return error
606        let result = memtable.insert(mutation);
607        assert!(result.is_err());
608    }
609
610    #[test]
611    fn test_memtable_nested_collection_depth_limit() {
612        // Create a deeply nested list structure (40 levels deep)
613        let mut nested_value = Value::Integer(42);
614        for _ in 0..40 {
615            nested_value = Value::List(vec![nested_value]);
616        }
617
618        // Should not panic and should return conservative estimate
619        let size = Memtable::estimate_value_size(&nested_value);
620
621        // At depth 32, should return 1024 for each remaining level
622        // First 32 levels recurse normally, remaining 8 levels return 1024 each
623        assert!(size > 0);
624        assert!(
625            size >= 1024,
626            "Should use conservative estimate at max depth"
627        );
628    }
629
630    #[test]
631    fn test_memtable_nested_map_depth_limit() {
632        // Create deeply nested map structure
633        let mut nested_value = Value::Text("bottom".to_string());
634        for _ in 0..35 {
635            nested_value = Value::Map(vec![(Value::Integer(1), nested_value)]);
636        }
637
638        // Should not panic
639        let size = Memtable::estimate_value_size(&nested_value);
640        assert!(size > 0);
641        assert!(
642            size >= 1024,
643            "Should use conservative estimate for deep nesting"
644        );
645    }
646
647    #[test]
648    fn test_memtable_nested_udt_depth_limit() {
649        use crate::types::{UdtField, UdtValue};
650
651        // Create deeply nested UDT structure
652        let mut nested_value = Value::Integer(1);
653        for i in 0..35 {
654            let udt = UdtValue {
655                type_name: format!("type_{}", i),
656                keyspace: "test_ks".to_string(),
657                fields: vec![UdtField {
658                    name: "field".to_string(),
659                    value: Some(nested_value),
660                }],
661            };
662            nested_value = Value::Udt(udt);
663        }
664
665        // Should not panic
666        let size = Memtable::estimate_value_size(&nested_value);
667        assert!(size > 0);
668        assert!(size >= 1024);
669    }
670
671    #[test]
672    fn test_memtable_frozen_nested_depth_limit() {
673        // Create deeply nested frozen values
674        let mut nested_value = Value::Integer(99);
675        for _ in 0..40 {
676            nested_value = Value::Frozen(Box::new(nested_value));
677        }
678
679        // Should not panic or cause stack overflow
680        let size = Memtable::estimate_value_size(&nested_value);
681        assert!(size > 0);
682        assert!(size >= 1024);
683    }
684
685    #[test]
686    fn test_memtable_mixed_nested_collections() {
687        use crate::types::{UdtField, UdtValue};
688
689        // Create a complex nested structure mixing different types
690        let mut nested_value = Value::Text("base".to_string());
691
692        // Alternate between different collection types
693        for i in 0..50 {
694            nested_value = match i % 5 {
695                0 => Value::List(vec![nested_value]),
696                1 => Value::Set(vec![nested_value]),
697                2 => Value::Map(vec![(Value::Integer(i), nested_value)]),
698                3 => Value::Tuple(vec![nested_value]),
699                4 => Value::Udt(UdtValue {
700                    type_name: format!("type_{}", i),
701                    keyspace: "test_ks".to_string(),
702                    fields: vec![UdtField {
703                        name: "f".to_string(),
704                        value: Some(nested_value),
705                    }],
706                }),
707                _ => unreachable!(),
708            };
709        }
710
711        // Should handle mixed nesting without panic
712        let size = Memtable::estimate_value_size(&nested_value);
713        assert!(size > 0);
714    }
715
716    #[test]
717    fn test_memtable_depth_limit_exact_boundary() {
718        // Test at exactly the max depth (32 levels)
719        let mut nested_value = Value::Integer(1);
720        for _ in 0..32 {
721            nested_value = Value::List(vec![nested_value]);
722        }
723
724        // Should handle max depth normally
725        let size = Memtable::estimate_value_size(&nested_value);
726        assert!(size > 0);
727
728        // Add one more level - should hit depth limit on deepest value
729        nested_value = Value::List(vec![nested_value]);
730        let size_over = Memtable::estimate_value_size(&nested_value);
731
732        // Size with depth limit should be >= 1024 due to conservative estimate
733        assert!(size_over >= 1024);
734    }
735
736    #[test]
737    fn test_memtable_shallow_collections_unaffected() {
738        // Verify shallow collections are not affected by depth limit
739
740        // Simple list
741        let simple_list = Value::List(vec![
742            Value::Integer(1),
743            Value::Integer(2),
744            Value::Integer(3),
745        ]);
746        let size = Memtable::estimate_value_size(&simple_list);
747        assert_eq!(size, 12 + 16); // 3 * 4 bytes + overhead
748
749        // Nested but shallow (3 levels)
750        let shallow_nested =
751            Value::List(vec![Value::List(vec![Value::List(vec![Value::Integer(
752                1,
753            )])])]);
754        let size = Memtable::estimate_value_size(&shallow_nested);
755        assert!(size > 0);
756        assert!(size < 1024); // Should not use conservative estimate
757    }
758}