motedb 0.1.6

AI-native embedded multimodal database for embodied intelligence (robots, AR glasses, industrial arms).
Documentation
//! MemTable: In-memory write buffer using Skip List
//!
//! ## Performance
//! - Write: O(log n), ~10μs
//! - Read: O(log n), ~1μs
//! - Capacity: 4MB (50K entries)

use super::{Key, Value, LSMConfig};
use crate::Result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
use std::collections::BTreeMap;

/// In-memory write buffer
pub struct MemTable {
    /// Sorted key-value map (using BTreeMap as Skip List)
    /// Using BTreeMap for now (consider Skip List for better concurrent performance in future)
    data: Arc<RwLock<BTreeMap<Key, Value>>>,
    
    /// Current size in bytes
    size: AtomicUsize,
    
    /// Maximum size before flush
    max_size: usize,
    
    /// Sequence number (for ordering)
    next_seq: AtomicUsize,
}

impl MemTable {
    /// Create a new MemTable
    pub fn new(config: &LSMConfig) -> Self {
        Self {
            data: Arc::new(RwLock::new(BTreeMap::new())),
            size: AtomicUsize::new(0),
            max_size: config.memtable_size,
            next_seq: AtomicUsize::new(0),
        }
    }
    
    /// Insert a key-value pair
    pub fn put(&self, key: Key, value: Value) -> Result<()> {
        let key_size = 8; // u64 is always 8 bytes
        let value_size = value.data.len() + 16; // data + metadata
        let entry_size = key_size + value_size;
        
        let mut data = self.data.write();

        // Update size
        if let Some(old_value) = data.get(&key) {
            let old_size = key_size + old_value.data.len() + 16;
            self.size.fetch_sub(old_size, Ordering::Relaxed);
        }
        
        data.insert(key, value);
        self.size.fetch_add(entry_size, Ordering::Relaxed);
        self.next_seq.fetch_add(1, Ordering::Relaxed);
        
        Ok(())
    }
    
    /// 🚀 P2 优化:批量插入(一次加锁,减少锁竞争)
    /// 
    /// ## 性能优化
    /// - 单次加锁插入所有 KV 对
    /// - 批量更新 size 计数器
    /// - 减少锁竞争和原子操作次数
    /// 
    /// ## 预期效果
    /// - 1000 条插入:1000 次加锁 → 1 次加锁
    /// - 性能提升:3-5 倍
    pub fn batch_put(&self, kvs: &[(Key, Value)]) -> Result<()> {
        if kvs.is_empty() {
            return Ok(());
        }
        
        let mut data = self.data.write();

        let mut total_size_change: i64 = 0;
        
        for (key, value) in kvs {
            let key_size = 8;
            let value_size = value.data.len() + 16;
            let entry_size = key_size + value_size;
            
            // Calculate size change
            if let Some(old_value) = data.get(key) {
                let old_size = key_size + old_value.data.len() + 16;
                total_size_change -= old_size as i64;
            }
            
            data.insert(*key, value.clone());
            total_size_change += entry_size as i64;
        }
        
        // Batch update size (single atomic operation)
        if total_size_change > 0 {
            self.size.fetch_add(total_size_change as usize, Ordering::Relaxed);
        } else if total_size_change < 0 {
            self.size.fetch_sub((-total_size_change) as usize, Ordering::Relaxed);
        }
        
        self.next_seq.fetch_add(kvs.len(), Ordering::Relaxed);
        
        Ok(())
    }
    
    /// Get a value by key
    pub fn get(&self, key: Key) -> Result<Option<Value>> {
        let data = self.data.read();

        Ok(data.get(&key).cloned())
    }
    
    /// Delete a key (insert tombstone)
    pub fn delete(&self, key: Key, timestamp: u64) -> Result<()> {
        self.put(key, Value::tombstone(timestamp))
    }
    
    /// Check if MemTable should be flushed
    pub fn should_flush(&self) -> bool {
        self.size.load(Ordering::Relaxed) >= self.max_size
    }
    
    /// Get current size in bytes
    pub fn size(&self) -> usize {
        self.size.load(Ordering::Relaxed)
    }
    
    /// Get number of entries
    pub fn len(&self) -> usize {
        self.data.read().len()
    }
    
    /// Check if empty
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
    
    /// Iterate over all entries (for flushing to SSTable)
    /// OPTIMIZED: O(n) instead of O(n²)
    pub fn iter(&self) -> MemTableIteratorOptimized {
        MemTableIteratorOptimized::new(self.data.clone())
    }
    
    /// Get snapshot of all data (for testing)
    pub fn snapshot(&self) -> Vec<(Key, Value)> {
        let data = self.data.read();
        data.iter()
            .map(|(k, v)| (*k, v.clone()))
            .collect()
    }
    
    /// Scan a range of keys [start, end) - Zero-copy with callback
    /// 
    /// ✅ Zero-copy optimization: No Vec allocation, processes items in-place
    pub fn scan_with<F>(&self, start: Key, end: Key, mut f: F) -> Result<()>
    where
        F: FnMut(Key, &Value) -> Result<()>,
    {
        let data = self.data.read();

        // Use BTreeMap's range() for efficient range query: O(log n + k)
        use std::ops::Bound;
        let range = data.range((
            Bound::Included(&start), 
            Bound::Excluded(&end)
        ));
        
        for (k, v) in range {
            // Skip tombstones (deleted entries)
            if !v.deleted {
                f(*k, v)?;  // ✅ Zero-copy: pass reference to Value
            }
        }
        
        Ok(())
    }
    
    /// Scan a range of keys [start, end) - Legacy API (allocates Vec)
    /// 
    /// ⚠️ Prefer scan_with() for zero-copy iteration
    pub fn scan(&self, start: Key, end: Key) -> Result<Vec<(Key, Value)>> {
        // 🚀 P3 优化:预分配容量(估算范围大小)
        let estimated_size = ((end - start) as usize).min(1000);
        let mut results = Vec::with_capacity(estimated_size);
        self.scan_with(start, end, |k, v| {
            results.push((k, v.clone()));
            Ok(())
        })?;
        Ok(results)
    }
    
    /// Scan all entries with callback - Zero-copy
    /// 
    /// ✅ Zero-copy optimization: No Vec allocation
    pub fn scan_all_with<F>(&self, mut f: F) -> Result<()>
    where
        F: FnMut(Key, &Value) -> Result<()>,
    {
        let data = self.data.read();

        for (k, v) in data.iter() {
            if !v.deleted {
                f(*k, v)?;  // ✅ Zero-copy: pass reference
            }
        }
        
        Ok(())
    }
    
    /// Get all entries (for full table scan) - Legacy API
    /// 
    /// ⚠️ Prefer scan_all_with() for zero-copy iteration
    pub fn scan_all(&self) -> Result<Vec<(Key, Value)>> {
        // 🚀 P3 优化:预分配容量
        let mut results = Vec::with_capacity(1000);
        self.scan_all_with(|k, v| {
            results.push((k, v.clone()));
            Ok(())
        })?;
        Ok(results)
    }
}

/// Legacy iterator - O(n²) performance, kept for compatibility
/// Use MemTableIteratorOptimized instead for O(n) performance
#[allow(dead_code)]
pub struct MemTableIterator {
    data: Arc<RwLock<BTreeMap<Key, Value>>>,
    index: usize,
}

#[allow(dead_code)]
impl Iterator for MemTableIterator {
    type Item = (Key, Value);
    
    fn next(&mut self) -> Option<Self::Item> {
        // Note: O(n²) complexity - nth() walks from start each time
        // Use MemTableIteratorOptimized for production code
        let data = self.data.read();
        let item = data.iter().nth(self.index)?;
        self.index += 1;
        Some((*item.0, item.1.clone()))
    }
}

/// Optimized iterator that clones data once
pub struct MemTableIteratorOptimized {
    entries: std::vec::IntoIter<(Key, Value)>,
}

impl MemTableIteratorOptimized {
    pub fn new(data: Arc<RwLock<BTreeMap<Key, Value>>>) -> Self {
        let data = data.read();
        let entries: Vec<(Key, Value)> = data.iter()
            .map(|(k, v)| (*k, v.clone()))  // ✅ u64 copy is cheap, no clone()
            .collect();
        Self {
            entries: entries.into_iter(),
        }
    }
}

impl Iterator for MemTableIteratorOptimized {
    type Item = (Key, Value);
    
    fn next(&mut self) -> Option<Self::Item> {
        self.entries.next()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    
    fn create_memtable() -> MemTable {
        MemTable::new(&LSMConfig::default())
    }
    
    #[test]
    fn test_put_get() {
        let memtable = create_memtable();
        
        let key = 12345u64;  // ✅ u64 key
        let value = Value::new(b"test_value".to_vec(), 1);
        
        memtable.put(key, value.clone()).unwrap();
        
        let retrieved = memtable.get(key).unwrap().unwrap();
        assert_eq!(retrieved.data, value.data);
        assert_eq!(retrieved.timestamp, 1);
        assert_eq!(retrieved.deleted, false);
    }
    
    #[test]
    fn test_delete() {
        let memtable = create_memtable();
        
        let key = 12345u64;  // ✅ u64 key
        memtable.put(key, Value::new(b"value".to_vec(), 1)).unwrap();
        memtable.delete(key, 2).unwrap();
        
        let retrieved = memtable.get(key).unwrap().unwrap();
        assert_eq!(retrieved.deleted, true);
        assert_eq!(retrieved.timestamp, 2);
    }
    
    #[test]
    fn test_size_tracking() {
        let memtable = create_memtable();
        
        assert_eq!(memtable.size(), 0);
        
        let key = 123u64;  // ✅ u64 key
        let value = Value::new(b"value".to_vec(), 1);
        memtable.put(key, value).unwrap();
        
        assert!(memtable.size() > 0);
        
        // Update should replace old value
        let new_value = Value::new(b"new_value".to_vec(), 2);
        memtable.put(key, new_value).unwrap();
        
        assert!(memtable.size() > 0);
    }
    
    #[test]
    fn test_should_flush() {
        let mut config = LSMConfig::default();
        config.memtable_size = 100; // Small size for testing
        let memtable = MemTable::new(&config);
        
        assert_eq!(memtable.should_flush(), false);
        
        // Insert data until flush is needed
        for i in 0..10 {
            let key = i as u64;  // ✅ u64 key
            let value = Value::new(vec![0u8; 20], i);
            memtable.put(key, value).unwrap();
        }
        
        assert_eq!(memtable.should_flush(), true);
    }
    
    #[test]
    fn test_iterator() {
        let memtable = create_memtable();
        
        // Insert data
        for i in 0..5 {
            let key = i as u64;  // ✅ u64 key (naturally sorted)
            let value = Value::new(format!("value_{}", i).into_bytes(), i as u64);
            memtable.put(key, value).unwrap();
        }
        
        // Iterate and verify order
        let items: Vec<_> = memtable.iter().collect();
        assert_eq!(items.len(), 5);
        
        // BTreeMap should maintain sorted order
        for (i, (key, _)) in items.iter().enumerate() {
            let expected_key = i as u64;
            assert_eq!(*key, expected_key);
        }
    }
}