motedb 0.1.4

AI-native embedded multimodal database for embodied intelligence (robots, AR glasses, industrial arms).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
//! Row Cache - LRU cache with sequential prefetching
//!
//! **Purpose**: Reduce LSM read latency for hot data
//!
//! **Performance**: Cache hit = <1ยตs, Cache miss = ~46ยตs (LSM read)
//!
//! **Memory**: Default 10,000 rows โ‰ˆ 10MB (assuming 1KB/row average)
//!
//! **P2 Prefetching**: Detects sequential access patterns and prefetches ahead

use crate::types::{Row, RowId};
use dashmap::DashMap;
use lru::LruCache;
use parking_lot::RwLock;
use std::num::NonZeroUsize;
use std::sync::Arc;

/// Row cache key: (table_name, row_id)
pub type CacheKey = (String, RowId);

/// Access pattern tracker for sequential detection
#[derive(Debug, Clone)]
struct AccessPattern {
    /// Last accessed row_id
    last_row_id: RowId,
    /// Detected stride (difference between consecutive accesses)
    stride: i64,
    /// Consecutive sequential accesses count
    sequential_count: usize,
    /// Last access timestamp (for aging)
    last_access: std::time::Instant,
}

/// Row cache with LRU eviction and prefetching
pub struct RowCache {
    /// LRU cache: (table_name, row_id) -> Arc<Row>
    /// 
    /// Using Arc<Row> to allow cheap cloning when returning cached data
    cache: Arc<RwLock<LruCache<CacheKey, Arc<Row>>>>,
    
    /// Statistics
    stats: Arc<RwLock<CacheStats>>,
    
    /// ๐Ÿš€ P2: Access pattern tracker per table - ไฝฟ็”จ DashMap ๆๅ‡ๅนถๅ‘ๆ€ง่ƒฝ
    /// Tracks recent access patterns to detect sequential scans
    access_patterns: Arc<DashMap<String, AccessPattern>>,
    
    /// ๐Ÿš€ P2: Prefetch configuration
    prefetch_config: PrefetchConfig,
}

/// Cache statistics
#[derive(Debug, Default, Clone)]
pub struct CacheStats {
    /// Total cache hits
    pub hits: u64,
    /// Total cache misses
    pub misses: u64,
    /// Current cache size
    pub size: usize,
    /// Maximum cache size
    pub capacity: usize,
    /// ๐Ÿš€ P2: Prefetch statistics
    pub prefetch_triggered: u64,
    pub prefetch_useful: u64,  // Prefetched rows that were actually used
}

impl CacheStats {
    /// Calculate hit rate
    pub fn hit_rate(&self) -> f64 {
        let total = self.hits + self.misses;
        if total == 0 {
            0.0
        } else {
            self.hits as f64 / total as f64
        }
    }
    
    /// ๐Ÿš€ P2: Calculate prefetch efficiency
    pub fn prefetch_efficiency(&self) -> f64 {
        if self.prefetch_triggered == 0 {
            0.0
        } else {
            self.prefetch_useful as f64 / self.prefetch_triggered as f64
        }
    }
}

/// ๐Ÿš€ P2: Prefetch configuration
#[derive(Debug, Clone)]
pub struct PrefetchConfig {
    /// Enable prefetching
    pub enabled: bool,
    /// Minimum sequential accesses to trigger prefetching
    pub min_sequential_count: usize,
    /// Number of rows to prefetch ahead
    pub prefetch_size: usize,
    /// Maximum stride (gap between consecutive IDs) to consider sequential
    pub max_stride: i64,
}

impl Default for PrefetchConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            min_sequential_count: 3,  // Trigger after 3 consecutive sequential accesses
            prefetch_size: 32,        // Prefetch 32 rows ahead
            max_stride: 100,          // Max gap of 100 to consider sequential
        }
    }
}

impl RowCache {
    /// Create a new row cache
    /// 
    /// # Arguments
    /// * `capacity` - Maximum number of rows to cache (default: 10000)
    /// 
    /// # Memory Usage
    /// Assuming 1KB/row average: 10000 rows โ‰ˆ 10MB
    pub fn new(capacity: usize) -> Self {
        Self::with_prefetch_config(capacity, PrefetchConfig::default())
    }
    
    /// ๐Ÿš€ P2: Create a new row cache with custom prefetch configuration
    pub fn with_prefetch_config(capacity: usize, prefetch_config: PrefetchConfig) -> Self {
        let capacity = capacity.max(1);  // Minimum 1 row (changed from 100 for testing)
        
        Self {
            cache: Arc::new(RwLock::new(
                LruCache::new(NonZeroUsize::new(capacity).unwrap())
            )),
            stats: Arc::new(RwLock::new(CacheStats {
                hits: 0,
                misses: 0,
                size: 0,
                capacity,
                prefetch_triggered: 0,
                prefetch_useful: 0,
            })),
            access_patterns: Arc::new(DashMap::new()),
            prefetch_config,
        }
    }
    
    /// Get a row from cache
    /// 
    /// Returns Some(Arc<Row>) if found (cache hit), None otherwise (cache miss)
    /// 
    /// ๐Ÿš€ P2: Also updates access pattern for prefetch detection
    pub fn get(&self, table_name: &str, row_id: RowId) -> Option<Arc<Row>> {
        let key = (table_name.to_string(), row_id);
        
        let mut cache = self.cache.write();
        
        if let Some(row) = cache.get(&key) {
            // Cache hit
            let mut stats = self.stats.write();
            stats.hits += 1;
            
            // ๐Ÿš€ P2: Update access pattern
            drop(stats);  // Release lock before pattern update
            self.update_access_pattern(table_name, row_id);
            
            Some(Arc::clone(row))
        } else {
            // Cache miss
            let mut stats = self.stats.write();
            stats.misses += 1;
            
            // ๐Ÿš€ P2: Update access pattern
            drop(stats);  // Release lock before pattern update
            self.update_access_pattern(table_name, row_id);
            
            None
        }
    }
    
    /// ๐Ÿš€ P2: Update access pattern and detect sequential scans
    /// 
    /// Returns prefetch recommendation: Some((start_row_id, count)) if prefetch should be triggered
    fn update_access_pattern(&self, table_name: &str, row_id: RowId) -> Option<(RowId, usize)> {
        if !self.prefetch_config.enabled {
            return None;
        }
        
        let now = std::time::Instant::now();
        
        // ๐Ÿš€ ไฝฟ็”จ DashMap entry API ่ฟ›่กŒๅŽŸๅญๆ›ดๆ–ฐ
        let should_prefetch = match self.access_patterns.entry(table_name.to_string()) {
            dashmap::mapref::entry::Entry::Occupied(mut entry) => {
                let pattern = entry.get_mut();
                
                // Age out old patterns (>1 second old)
                if now.duration_since(pattern.last_access).as_secs() > 1 {
                    // Reset pattern
                    pattern.last_row_id = row_id;
                    pattern.stride = 0;
                    pattern.sequential_count = 1;
                    pattern.last_access = now;
                    return None;
                }
                
                let stride = row_id as i64 - pattern.last_row_id as i64;
                
                // Check if stride matches (sequential access)
                if stride == pattern.stride && stride.abs() <= self.prefetch_config.max_stride {
                    // Consecutive sequential access
                    pattern.sequential_count += 1;
                    pattern.last_row_id = row_id;
                    pattern.last_access = now;
                    
                    // Trigger prefetch if threshold reached
                    if pattern.sequential_count >= self.prefetch_config.min_sequential_count {
                        // Calculate prefetch range
                        let next_row_id = (row_id as i64 + stride) as RowId;
                        Some((next_row_id, self.prefetch_config.prefetch_size))
                    } else {
                        None
                    }
                } else if stride.abs() <= self.prefetch_config.max_stride {
                    // New stride detected, reset count
                    pattern.stride = stride;
                    pattern.sequential_count = 2;  // Current + previous
                    pattern.last_row_id = row_id;
                    pattern.last_access = now;
                    None
                } else {
                    // Random access, reset
                    pattern.stride = 0;
                    pattern.sequential_count = 1;
                    pattern.last_row_id = row_id;
                    pattern.last_access = now;
                    None
                }
            }
            dashmap::mapref::entry::Entry::Vacant(entry) => {
                // First access for this table
                entry.insert(AccessPattern {
                    last_row_id: row_id,
                    stride: 0,
                    sequential_count: 1,
                    last_access: now,
                });
                None
            }
        };
        
        should_prefetch
    }
    
    /// ๐Ÿš€ P2: Check if prefetch should be triggered for current access
    /// 
    /// Returns Some((start_row_id, count, stride)) if prefetch is recommended
    pub fn check_prefetch(&self, table_name: &str, row_id: RowId) -> Option<(RowId, usize, i64)> {
        if !self.prefetch_config.enabled {
            return None;
        }
        
        // ๐Ÿš€ ไฝฟ็”จ DashMap ็š„ get() ๆ–นๆณ•
        if let Some(pattern_ref) = self.access_patterns.get(table_name) {
            let pattern = pattern_ref.value();
            
            // Check if pattern is fresh (accessed within 1 second)
            if pattern.last_access.elapsed().as_secs() > 1 {
                return None;
            }
            
            // Check if enough sequential accesses observed
            if pattern.sequential_count >= self.prefetch_config.min_sequential_count {
                let stride = pattern.stride;
                let next_row_id = (row_id as i64 + stride) as RowId;
                
                return Some((next_row_id, self.prefetch_config.prefetch_size, stride));
            }
        }
        
        None
    }
    
    /// Put a row into cache
    /// 
    /// # Arguments
    /// * `table_name` - Table name
    /// * `row_id` - Row ID
    /// * `row` - Row data (will be wrapped in Arc for cheap cloning)
    pub fn put(&self, table_name: String, row_id: RowId, row: Row) {
        let key = (table_name, row_id);
        let row_arc = Arc::new(row);
        
        let mut cache = self.cache.write();
        cache.put(key, row_arc);
        
        // Update stats
        let mut stats = self.stats.write();
        stats.size = cache.len();
    }
    
    /// Batch put rows into cache
    /// 
    /// More efficient than calling put() multiple times
    pub fn put_batch(&self, table_name: &str, rows: Vec<(RowId, Row)>) {
        let mut cache = self.cache.write();
        
        for (row_id, row) in rows {
            let key = (table_name.to_string(), row_id);
            cache.put(key, Arc::new(row));
        }
        
        // Update stats
        let mut stats = self.stats.write();
        stats.size = cache.len();
    }
    
    /// Invalidate a single row
    /// 
    /// Called when row is updated or deleted
    pub fn invalidate(&self, table_name: &str, row_id: RowId) {
        let key = (table_name.to_string(), row_id);
        
        let mut cache = self.cache.write();
        cache.pop(&key);
        
        // Update stats
        let mut stats = self.stats.write();
        stats.size = cache.len();
    }
    
    /// Invalidate all rows for a table
    /// 
    /// Called when table is dropped or truncated
    pub fn invalidate_table(&self, table_name: &str) {
        let mut cache = self.cache.write();
        
        // Collect keys to remove (can't remove while iterating)
        let keys_to_remove: Vec<CacheKey> = cache
            .iter()
            .filter(|(key, _)| key.0 == table_name)
            .map(|(key, _)| key.clone())
            .collect();
        
        for key in keys_to_remove {
            cache.pop(&key);
        }
        
        // Update stats
        let mut stats = self.stats.write();
        stats.size = cache.len();
    }
    
    /// Clear entire cache
    pub fn clear(&self) {
        let mut cache = self.cache.write();
        cache.clear();
        
        let mut stats = self.stats.write();
        stats.size = 0;
        stats.hits = 0;
        stats.misses = 0;
        stats.prefetch_triggered = 0;
        stats.prefetch_useful = 0;
        
        // ๐Ÿš€ P2: Clear access patterns - DashMap ็š„ clear() ๆ–นๆณ•
        self.access_patterns.clear();
    }
    
    /// ๐Ÿš€ P2: Record that a prefetch was triggered
    pub fn record_prefetch(&self, count: usize) {
        let mut stats = self.stats.write();
        stats.prefetch_triggered += count as u64;
    }
    
    /// ๐Ÿš€ P2: Record that a prefetched row was actually used
    pub fn record_prefetch_hit(&self) {
        let mut stats = self.stats.write();
        stats.prefetch_useful += 1;
    }
    
    /// Get cache statistics
    pub fn stats(&self) -> CacheStats {
        self.stats.read().clone()
    }
    
    /// Print cache statistics
    pub fn print_stats(&self) {
        let stats = self.stats();
        println!("๐Ÿ“Š Row Cache Statistics:");
        println!("   Hits: {}, Misses: {}", stats.hits, stats.misses);
        println!("   Hit Rate: {:.2}%", stats.hit_rate() * 100.0);
        println!("   Size: {}/{} rows", stats.size, stats.capacity);
        
        // ๐Ÿš€ P2: Prefetch stats
        if self.prefetch_config.enabled {
            println!("๐Ÿš€ Prefetch Statistics:");
            println!("   Triggered: {} rows", stats.prefetch_triggered);
            println!("   Useful: {} rows", stats.prefetch_useful);
            println!("   Efficiency: {:.2}%", stats.prefetch_efficiency() * 100.0);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::Value;
    
    #[test]
    fn test_row_cache_basic() {
        let cache = RowCache::new(100);
        
        // Create test row
        let mut row = Row::new();
        row.push(Value::Integer(1));
        row.push(Value::Text("test".to_string()));
        
        // Cache miss
        assert!(cache.get("users", 1).is_none());
        
        // Put into cache
        cache.put("users".to_string(), 1, row.clone());
        
        // Cache hit
        let cached_row = cache.get("users", 1).unwrap();
        assert_eq!(cached_row.len(), 2);
        
        // Stats check
        let stats = cache.stats();
        assert_eq!(stats.hits, 1);
        assert_eq!(stats.misses, 1);
        assert_eq!(stats.hit_rate(), 0.5);
    }
    
    #[test]
    fn test_row_cache_invalidation() {
        let cache = RowCache::new(100);
        
        let mut row = Row::new();
        row.push(Value::Integer(1));
        
        cache.put("users".to_string(), 1, row.clone());
        assert!(cache.get("users", 1).is_some());
        
        // Invalidate
        cache.invalidate("users", 1);
        assert!(cache.get("users", 1).is_none());
    }
    
    #[test]
    fn test_row_cache_lru_eviction() {
        let cache = RowCache::new(3);  // Small cache
        
        // Fill cache
        for i in 1..=3 {
            let mut row = Row::new();
            row.push(Value::Integer(i));
            cache.put("users".to_string(), i as u64, row);
        }
        
        // Cache is full
        let stats = cache.stats();
        assert_eq!(stats.size, 3);
        
        // Add one more -> LRU eviction
        let mut row = Row::new();
        row.push(Value::Integer(4));
        cache.put("users".to_string(), 4, row);
        
        // Size should still be 3
        let stats = cache.stats();
        assert_eq!(stats.size, 3);
        
        // Oldest entry (1) should be evicted
        assert!(cache.get("users", 1).is_none());
        assert!(cache.get("users", 4).is_some());
    }
}