sochdb_storage/
queue_index.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Queue-Optimized Index Policy
16//!
17//! This module extends the per-table index policy with queue-specific
18//! optimizations that ensure efficient priority queue operations.
19//!
20//! ## Queue Access Patterns
21//!
22//! Queues have specific access patterns that differ from general tables:
23//!
24//! | Operation | Pattern                              | Requirement          |
25//! |-----------|--------------------------------------|----------------------|
26//! | Enqueue   | Insert at any position               | O(log N) or better   |
27//! | Dequeue   | Find minimum key, delete it          | O(log N) find + O(1) delete |
28//! | Peek      | Read minimum key without deletion    | O(log N)             |
29//! | Count     | Get queue size                       | O(1)                 |
30//!
31//! ## Why Queue Tables Need ScanOptimized Policy
32//!
33//! The dequeue operation requires "find minimum key", which is:
34//! - O(log N) with ordered index (ScanOptimized)
35//! - O(N) without ordered index (WriteOptimized/Balanced with deferred sort)
36//!
37//! For a queue with 10,000 tasks:
38//! - With ScanOptimized: ~14 comparisons per dequeue
39//! - With WriteOptimized: ~10,000 comparisons per dequeue
40//!
41//! ## Avoiding Deferred-Sort Latency Spikes
42//!
43//! The Balanced policy uses "deferred sorting" where writes are O(1) append
44//! and scans trigger O(N log N) sort-on-demand. This creates latency spikes:
45//!
46//! ```text
47//! Pop #1: 0.1ms (memtable small)
48//! Pop #2: 0.1ms
49//! ...
50//! Pop #1000: 50ms (sort triggered!) ← Latency spike
51//! Pop #1001: 0.2ms (now sorted)
52//! ```
53//!
54//! ScanOptimized maintains order on every write, giving predictable latency.
55//!
56//! ## Queue Index Configuration
57//!
58//! ```rust
59//! let config = QueueIndexConfig::new("task_queue")
60//!     .with_priority_column("priority")
61//!     .with_timestamp_column("ready_at")
62//!     .with_fifo_column("sequence")
63//!     .build();
64//! ```
65
66use std::collections::BTreeMap;
67use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
68use std::sync::Arc;
69
70use parking_lot::RwLock;
71
72use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
73use crate::key_buffer::ArenaKeyHandle;
74
75// ============================================================================
76// QueueIndexConfig - Queue-Specific Configuration
77// ============================================================================
78
79/// Configuration for queue tables
80#[derive(Debug, Clone)]
81pub struct QueueIndexConfig {
82    /// Base table configuration
83    pub base: TableIndexConfig,
84    /// Name of the priority column (for composite key ordering)
85    pub priority_column: Option<String>,
86    /// Name of the timestamp column (for ready-time ordering)
87    pub timestamp_column: Option<String>,
88    /// Name of the sequence column (for FIFO within same priority)
89    pub fifo_column: Option<String>,
90    /// Whether to maintain min-key cache for O(1) peek
91    pub enable_min_key_cache: bool,
92    /// Whether to track queue size for O(1) count
93    pub enable_size_tracking: bool,
94}
95
96impl QueueIndexConfig {
97    /// Create a new queue index config
98    pub fn new(queue_name: impl Into<String>) -> Self {
99        Self {
100            base: TableIndexConfig::new(queue_name, IndexPolicy::ScanOptimized),
101            priority_column: None,
102            timestamp_column: None,
103            fifo_column: None,
104            enable_min_key_cache: true,
105            enable_size_tracking: true,
106        }
107    }
108
109    /// Set the priority column name
110    pub fn with_priority_column(mut self, column: impl Into<String>) -> Self {
111        self.priority_column = Some(column.into());
112        self
113    }
114
115    /// Set the timestamp column name
116    pub fn with_timestamp_column(mut self, column: impl Into<String>) -> Self {
117        self.timestamp_column = Some(column.into());
118        self
119    }
120
121    /// Set the FIFO sequence column name
122    pub fn with_fifo_column(mut self, column: impl Into<String>) -> Self {
123        self.fifo_column = Some(column.into());
124        self
125    }
126
127    /// Enable or disable min-key cache
128    pub fn with_min_key_cache(mut self, enable: bool) -> Self {
129        self.enable_min_key_cache = enable;
130        self
131    }
132
133    /// Enable or disable size tracking
134    pub fn with_size_tracking(mut self, enable: bool) -> Self {
135        self.enable_size_tracking = enable;
136        self
137    }
138
139    /// Get the composite key columns for this queue
140    pub fn key_columns(&self) -> Vec<&str> {
141        let mut columns = Vec::new();
142        if let Some(ref col) = self.priority_column {
143            columns.push(col.as_str());
144        }
145        if let Some(ref col) = self.timestamp_column {
146            columns.push(col.as_str());
147        }
148        if let Some(ref col) = self.fifo_column {
149            columns.push(col.as_str());
150        }
151        columns
152    }
153}
154
155// ============================================================================
156// QueueIndex - Queue-Optimized Index Structure
157// ============================================================================
158
159/// A queue-optimized ordered index
160///
161/// This provides efficient priority queue operations by:
162/// 1. Maintaining a BTreeMap for O(log N) min-key access
163/// 2. Caching the minimum key for O(1) peek
164/// 3. Tracking size for O(1) count
165///
166/// ## Internal Structure
167///
168/// ```text
169/// ┌─────────────────────────────────────────────────────────────────────┐
170/// │                         QueueIndex                                   │
171/// ├─────────────────────────────────────────────────────────────────────┤
172/// │ entries: BTreeMap<CompositeKey, Value>  ← O(log N) ordered ops      │
173/// │ min_key_cache: Option<CompositeKey>     ← O(1) peek                 │
174/// │ size: AtomicUsize                       ← O(1) count                │
175/// │ version: AtomicU64                      ← For cache invalidation    │
176/// └─────────────────────────────────────────────────────────────────────┘
177/// ```
178pub struct QueueIndex<V: Clone + Send + Sync> {
179    /// The ordered entries
180    entries: RwLock<BTreeMap<CompositeQueueKey, V>>,
181    /// Cached minimum key (invalidated on mutation)
182    min_key_cache: RwLock<Option<CompositeQueueKey>>,
183    /// Current size
184    size: AtomicUsize,
185    /// Version counter for cache invalidation
186    version: AtomicU64,
187    /// Configuration
188    config: QueueIndexConfig,
189}
190
191/// Composite key for queue ordering
192///
193/// Encodes: priority + timestamp + sequence for deterministic ordering.
194#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
195pub struct CompositeQueueKey {
196    /// Primary sort: priority (lower = more urgent)
197    pub priority: i64,
198    /// Secondary sort: ready timestamp
199    pub timestamp: u64,
200    /// Tertiary sort: sequence number (for FIFO within same priority/time)
201    pub sequence: u64,
202    /// Task identifier
203    pub task_id: String,
204}
205
206impl CompositeQueueKey {
207    /// Create a new composite key
208    pub fn new(priority: i64, timestamp: u64, sequence: u64, task_id: impl Into<String>) -> Self {
209        Self {
210            priority,
211            timestamp,
212            sequence,
213            task_id: task_id.into(),
214        }
215    }
216
217    /// Encode to bytes for storage
218    pub fn encode(&self) -> Vec<u8> {
219        let mut bytes = Vec::with_capacity(32 + self.task_id.len());
220        
221        // Priority: map i64 to u64 preserving order
222        let priority_encoded = (self.priority as i128 + i64::MAX as i128 + 1) as u64;
223        bytes.extend_from_slice(&priority_encoded.to_be_bytes());
224        
225        // Timestamp: big-endian
226        bytes.extend_from_slice(&self.timestamp.to_be_bytes());
227        
228        // Sequence: big-endian
229        bytes.extend_from_slice(&self.sequence.to_be_bytes());
230        
231        // Task ID
232        bytes.extend_from_slice(self.task_id.as_bytes());
233        
234        bytes
235    }
236
237    /// Decode from bytes
238    pub fn decode(bytes: &[u8]) -> Option<Self> {
239        if bytes.len() < 24 {
240            return None;
241        }
242        
243        let priority_encoded = u64::from_be_bytes(bytes[0..8].try_into().ok()?);
244        let priority = (priority_encoded as i128 - i64::MAX as i128 - 1) as i64;
245        
246        let timestamp = u64::from_be_bytes(bytes[8..16].try_into().ok()?);
247        let sequence = u64::from_be_bytes(bytes[16..24].try_into().ok()?);
248        let task_id = String::from_utf8(bytes[24..].to_vec()).ok()?;
249        
250        Some(Self {
251            priority,
252            timestamp,
253            sequence,
254            task_id,
255        })
256    }
257}
258
259impl<V: Clone + Send + Sync> QueueIndex<V> {
260    /// Create a new queue index
261    pub fn new(config: QueueIndexConfig) -> Self {
262        Self {
263            entries: RwLock::new(BTreeMap::new()),
264            min_key_cache: RwLock::new(None),
265            size: AtomicUsize::new(0),
266            version: AtomicU64::new(0),
267            config,
268        }
269    }
270
271    /// Insert a task into the queue
272    ///
273    /// Complexity: O(log N)
274    pub fn insert(&self, key: CompositeQueueKey, value: V) {
275        let is_new_min = {
276            let entries = self.entries.read();
277            entries.first_key_value()
278                .map(|(min, _)| &key < min)
279                .unwrap_or(true)
280        };
281        
282        {
283            let mut entries = self.entries.write();
284            let was_absent = entries.insert(key.clone(), value).is_none();
285            
286            if was_absent {
287                self.size.fetch_add(1, Ordering::Relaxed);
288            }
289        }
290        
291        // Update min cache if this is the new minimum
292        if is_new_min && self.config.enable_min_key_cache {
293            *self.min_key_cache.write() = Some(key);
294        }
295        
296        self.version.fetch_add(1, Ordering::Release);
297    }
298
299    /// Peek at the minimum key without removing it
300    ///
301    /// Complexity: O(1) if cache hit, O(log N) if cache miss
302    pub fn peek_min(&self) -> Option<(CompositeQueueKey, V)> {
303        // Try cache first
304        if self.config.enable_min_key_cache {
305            let cache = self.min_key_cache.read();
306            if let Some(ref cached_key) = *cache {
307                let entries = self.entries.read();
308                if let Some(value) = entries.get(cached_key) {
309                    return Some((cached_key.clone(), value.clone()));
310                }
311            }
312        }
313        
314        // Cache miss - scan
315        let entries = self.entries.read();
316        let result = entries.first_key_value()
317            .map(|(k, v)| (k.clone(), v.clone()));
318        
319        // Update cache
320        if self.config.enable_min_key_cache {
321            if let Some((ref key, _)) = result {
322                *self.min_key_cache.write() = Some(key.clone());
323            }
324        }
325        
326        result
327    }
328
329    /// Remove and return the minimum entry
330    ///
331    /// Complexity: O(log N)
332    pub fn pop_min(&self) -> Option<(CompositeQueueKey, V)> {
333        let result = {
334            let mut entries = self.entries.write();
335            entries.pop_first()
336        };
337        
338        if result.is_some() {
339            self.size.fetch_sub(1, Ordering::Relaxed);
340            
341            // Invalidate cache
342            if self.config.enable_min_key_cache {
343                *self.min_key_cache.write() = None;
344            }
345            
346            self.version.fetch_add(1, Ordering::Release);
347        }
348        
349        result
350    }
351
352    /// Remove a specific entry by key
353    ///
354    /// Complexity: O(log N)
355    pub fn remove(&self, key: &CompositeQueueKey) -> Option<V> {
356        let result = {
357            let mut entries = self.entries.write();
358            entries.remove(key)
359        };
360        
361        if result.is_some() {
362            self.size.fetch_sub(1, Ordering::Relaxed);
363            
364            // Invalidate cache if we removed the cached min
365            if self.config.enable_min_key_cache {
366                let should_invalidate = {
367                    let cache = self.min_key_cache.read();
368                    cache.as_ref().map(|c| c == key).unwrap_or(false)
369                };
370                if should_invalidate {
371                    *self.min_key_cache.write() = None;
372                }
373            }
374            
375            self.version.fetch_add(1, Ordering::Release);
376        }
377        
378        result
379    }
380
381    /// Get an entry by key
382    ///
383    /// Complexity: O(log N)
384    pub fn get(&self, key: &CompositeQueueKey) -> Option<V> {
385        self.entries.read().get(key).cloned()
386    }
387
388    /// Check if a key exists
389    ///
390    /// Complexity: O(log N)
391    pub fn contains(&self, key: &CompositeQueueKey) -> bool {
392        self.entries.read().contains_key(key)
393    }
394
395    /// Get queue size
396    ///
397    /// Complexity: O(1)
398    pub fn len(&self) -> usize {
399        if self.config.enable_size_tracking {
400            self.size.load(Ordering::Relaxed)
401        } else {
402            self.entries.read().len()
403        }
404    }
405
406    /// Check if queue is empty
407    ///
408    /// Complexity: O(1)
409    pub fn is_empty(&self) -> bool {
410        self.len() == 0
411    }
412
413    /// Get current version (for change detection)
414    pub fn version(&self) -> u64 {
415        self.version.load(Ordering::Acquire)
416    }
417
418    /// Scan entries with priority <= threshold
419    ///
420    /// Useful for batch processing of high-priority tasks.
421    ///
422    /// Complexity: O(log N + K) where K is result count
423    pub fn scan_by_priority(&self, max_priority: i64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
424        let entries = self.entries.read();
425        
426        entries.iter()
427            .take_while(|(k, _)| k.priority <= max_priority)
428            .take(limit)
429            .map(|(k, v)| (k.clone(), v.clone()))
430            .collect()
431    }
432
433    /// Scan entries ready at or before the given timestamp
434    ///
435    /// Complexity: O(N) in worst case, but typically O(K) if data is time-ordered
436    pub fn scan_ready(&self, now: u64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
437        let entries = self.entries.read();
438        
439        entries.iter()
440            .filter(|(k, _)| k.timestamp <= now)
441            .take(limit)
442            .map(|(k, v)| (k.clone(), v.clone()))
443            .collect()
444    }
445
446    /// Get the configuration
447    pub fn config(&self) -> &QueueIndexConfig {
448        &self.config
449    }
450}
451
452// ============================================================================
453// QueueTableRegistry - Queue-Aware Table Registry
454// ============================================================================
455
456/// Registry extension for queue tables
457pub struct QueueTableRegistry {
458    /// Base registry
459    base: TableIndexRegistry,
460    /// Queue-specific configs
461    queue_configs: RwLock<std::collections::HashMap<String, QueueIndexConfig>>,
462}
463
464impl QueueTableRegistry {
465    /// Create a new registry
466    pub fn new() -> Self {
467        Self {
468            base: TableIndexRegistry::with_default_policy(IndexPolicy::Balanced),
469            queue_configs: RwLock::new(std::collections::HashMap::new()),
470        }
471    }
472
473    /// Register a table as a queue
474    pub fn register_queue(&self, config: QueueIndexConfig) {
475        // Register base config with ScanOptimized policy
476        self.base.configure_table(config.base.clone());
477        
478        // Store queue-specific config
479        self.queue_configs.write().insert(
480            config.base.table_name.clone(),
481            config,
482        );
483    }
484
485    /// Check if a table is registered as a queue
486    pub fn is_queue(&self, table_name: &str) -> bool {
487        self.queue_configs.read().contains_key(table_name)
488    }
489
490    /// Get queue config
491    pub fn get_queue_config(&self, table_name: &str) -> Option<QueueIndexConfig> {
492        self.queue_configs.read().get(table_name).cloned()
493    }
494
495    /// Get the base registry
496    pub fn base(&self) -> &TableIndexRegistry {
497        &self.base
498    }
499}
500
501impl Default for QueueTableRegistry {
502    fn default() -> Self {
503        Self::new()
504    }
505}
506
507// ============================================================================
508// QueueStats - Queue Statistics
509// ============================================================================
510
511/// Statistics for a queue index
512#[derive(Debug, Clone, Default)]
513pub struct QueueIndexStats {
514    /// Current size
515    pub size: usize,
516    /// Number of inserts
517    pub inserts: u64,
518    /// Number of pops
519    pub pops: u64,
520    /// Number of peeks
521    pub peeks: u64,
522    /// Cache hit rate for peek operations
523    pub cache_hit_rate: f64,
524}
525
526// ============================================================================
527// Tests
528// ============================================================================
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533
534    #[test]
535    fn test_composite_key_ordering() {
536        let k1 = CompositeQueueKey::new(1, 100, 1, "task1");
537        let k2 = CompositeQueueKey::new(2, 100, 1, "task2");
538        let k3 = CompositeQueueKey::new(1, 200, 1, "task3");
539        let k4 = CompositeQueueKey::new(1, 100, 2, "task4");
540        
541        // Lower priority comes first
542        assert!(k1 < k2);
543        
544        // Same priority, earlier timestamp comes first
545        assert!(k1 < k3);
546        
547        // Same priority and timestamp, lower sequence comes first
548        assert!(k1 < k4);
549    }
550
551    #[test]
552    fn test_composite_key_encode_decode() {
553        let original = CompositeQueueKey::new(-100, 12345, 999, "my-task-id");
554        let encoded = original.encode();
555        let decoded = CompositeQueueKey::decode(&encoded).unwrap();
556        
557        assert_eq!(decoded.priority, original.priority);
558        assert_eq!(decoded.timestamp, original.timestamp);
559        assert_eq!(decoded.sequence, original.sequence);
560        assert_eq!(decoded.task_id, original.task_id);
561    }
562
563    #[test]
564    fn test_queue_index_insert_pop() {
565        let config = QueueIndexConfig::new("test_queue");
566        let index: QueueIndex<String> = QueueIndex::new(config);
567        
568        // Insert with different priorities
569        index.insert(CompositeQueueKey::new(3, 100, 1, "low"), "low priority".to_string());
570        index.insert(CompositeQueueKey::new(1, 100, 1, "high"), "high priority".to_string());
571        index.insert(CompositeQueueKey::new(2, 100, 1, "medium"), "medium priority".to_string());
572        
573        assert_eq!(index.len(), 3);
574        
575        // Pop should return highest priority (lowest number) first
576        let (key, value) = index.pop_min().unwrap();
577        assert_eq!(key.priority, 1);
578        assert_eq!(value, "high priority");
579        
580        let (key, _) = index.pop_min().unwrap();
581        assert_eq!(key.priority, 2);
582        
583        let (key, _) = index.pop_min().unwrap();
584        assert_eq!(key.priority, 3);
585        
586        assert!(index.is_empty());
587    }
588
589    #[test]
590    fn test_queue_index_peek() {
591        let config = QueueIndexConfig::new("test_queue");
592        let index: QueueIndex<i32> = QueueIndex::new(config);
593        
594        index.insert(CompositeQueueKey::new(2, 100, 1, "task1"), 1);
595        index.insert(CompositeQueueKey::new(1, 100, 1, "task2"), 2);
596        
597        // Peek should return min without removing
598        let (key, value) = index.peek_min().unwrap();
599        assert_eq!(key.priority, 1);
600        assert_eq!(value, 2);
601        
602        // Should still have 2 items
603        assert_eq!(index.len(), 2);
604        
605        // Peek again (should hit cache)
606        let (key, _) = index.peek_min().unwrap();
607        assert_eq!(key.priority, 1);
608    }
609
610    #[test]
611    fn test_queue_index_remove() {
612        let config = QueueIndexConfig::new("test_queue");
613        let index: QueueIndex<i32> = QueueIndex::new(config);
614        
615        let key1 = CompositeQueueKey::new(1, 100, 1, "task1");
616        let key2 = CompositeQueueKey::new(2, 100, 1, "task2");
617        
618        index.insert(key1.clone(), 1);
619        index.insert(key2.clone(), 2);
620        
621        // Remove by key
622        let removed = index.remove(&key1);
623        assert_eq!(removed, Some(1));
624        assert_eq!(index.len(), 1);
625        
626        // Pop should return remaining item
627        let (key, _) = index.pop_min().unwrap();
628        assert_eq!(key.task_id, "task2");
629    }
630
631    #[test]
632    fn test_scan_by_priority() {
633        let config = QueueIndexConfig::new("test_queue");
634        let index: QueueIndex<i32> = QueueIndex::new(config);
635        
636        for i in 1..=10 {
637            index.insert(CompositeQueueKey::new(i, 100, 1, format!("task{}", i)), i as i32);
638        }
639        
640        // Scan priority <= 3
641        let results = index.scan_by_priority(3, 100);
642        assert_eq!(results.len(), 3);
643        assert_eq!(results[0].0.priority, 1);
644        assert_eq!(results[1].0.priority, 2);
645        assert_eq!(results[2].0.priority, 3);
646    }
647
648    #[test]
649    fn test_scan_ready() {
650        let config = QueueIndexConfig::new("test_queue");
651        let index: QueueIndex<i32> = QueueIndex::new(config);
652        
653        // Insert tasks with different ready times
654        index.insert(CompositeQueueKey::new(1, 100, 1, "ready1"), 1);
655        index.insert(CompositeQueueKey::new(1, 200, 1, "ready2"), 2);
656        index.insert(CompositeQueueKey::new(1, 300, 1, "future"), 3);
657        
658        // Scan ready at timestamp 200
659        let results = index.scan_ready(200, 100);
660        assert_eq!(results.len(), 2);
661    }
662
663    #[test]
664    fn test_queue_registry() {
665        let registry = QueueTableRegistry::new();
666        
667        let queue_config = QueueIndexConfig::new("task_queue")
668            .with_priority_column("priority")
669            .with_timestamp_column("ready_at");
670        
671        registry.register_queue(queue_config);
672        
673        assert!(registry.is_queue("task_queue"));
674        assert!(!registry.is_queue("regular_table"));
675        
676        let config = registry.get_queue_config("task_queue").unwrap();
677        assert_eq!(config.priority_column, Some("priority".to_string()));
678    }
679
680    #[test]
681    fn test_fifo_within_priority() {
682        let config = QueueIndexConfig::new("test_queue");
683        let index: QueueIndex<String> = QueueIndex::new(config);
684        
685        // Insert tasks with same priority, different sequence
686        index.insert(CompositeQueueKey::new(1, 100, 3, "third"), "third".to_string());
687        index.insert(CompositeQueueKey::new(1, 100, 1, "first"), "first".to_string());
688        index.insert(CompositeQueueKey::new(1, 100, 2, "second"), "second".to_string());
689        
690        // Should pop in sequence order (FIFO)
691        let (_, v1) = index.pop_min().unwrap();
692        let (_, v2) = index.pop_min().unwrap();
693        let (_, v3) = index.pop_min().unwrap();
694        
695        assert_eq!(v1, "first");
696        assert_eq!(v2, "second");
697        assert_eq!(v3, "third");
698    }
699}