Skip to main content

sochdb_storage/
queue_index.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Queue-Optimized Index Policy
19//!
20//! This module extends the per-table index policy with queue-specific
21//! optimizations that ensure efficient priority queue operations.
22//!
23//! ## Queue Access Patterns
24//!
25//! Queues have specific access patterns that differ from general tables:
26//!
27//! | Operation | Pattern                              | Requirement          |
28//! |-----------|--------------------------------------|----------------------|
29//! | Enqueue   | Insert at any position               | O(log N) or better   |
30//! | Dequeue   | Find minimum key, delete it          | O(log N) find + O(1) delete |
31//! | Peek      | Read minimum key without deletion    | O(log N)             |
32//! | Count     | Get queue size                       | O(1)                 |
33//!
34//! ## Why Queue Tables Need ScanOptimized Policy
35//!
36//! The dequeue operation requires "find minimum key", which is:
37//! - O(log N) with ordered index (ScanOptimized)
38//! - O(N) without ordered index (WriteOptimized/Balanced with deferred sort)
39//!
40//! For a queue with 10,000 tasks:
41//! - With ScanOptimized: ~14 comparisons per dequeue
42//! - With WriteOptimized: ~10,000 comparisons per dequeue
43//!
44//! ## Avoiding Deferred-Sort Latency Spikes
45//!
46//! The Balanced policy uses "deferred sorting" where writes are O(1) append
47//! and scans trigger O(N log N) sort-on-demand. This creates latency spikes:
48//!
49//! ```text
50//! Pop #1: 0.1ms (memtable small)
51//! Pop #2: 0.1ms
52//! ...
53//! Pop #1000: 50ms (sort triggered!) ← Latency spike
54//! Pop #1001: 0.2ms (now sorted)
55//! ```
56//!
57//! ScanOptimized maintains order on every write, giving predictable latency.
58//!
59//! ## Queue Index Configuration
60//!
61//! ```rust
62//! let config = QueueIndexConfig::new("task_queue")
63//!     .with_priority_column("priority")
64//!     .with_timestamp_column("ready_at")
65//!     .with_fifo_column("sequence")
66//!     .build();
67//! ```
68
69use std::collections::BTreeMap;
70use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
71use std::sync::Arc;
72
73use parking_lot::RwLock;
74
75use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
76use crate::key_buffer::ArenaKeyHandle;
77
78// ============================================================================
79// QueueIndexConfig - Queue-Specific Configuration
80// ============================================================================
81
82/// Configuration for queue tables
83#[derive(Debug, Clone)]
84pub struct QueueIndexConfig {
85    /// Base table configuration
86    pub base: TableIndexConfig,
87    /// Name of the priority column (for composite key ordering)
88    pub priority_column: Option<String>,
89    /// Name of the timestamp column (for ready-time ordering)
90    pub timestamp_column: Option<String>,
91    /// Name of the sequence column (for FIFO within same priority)
92    pub fifo_column: Option<String>,
93    /// Whether to maintain min-key cache for O(1) peek
94    pub enable_min_key_cache: bool,
95    /// Whether to track queue size for O(1) count
96    pub enable_size_tracking: bool,
97}
98
99impl QueueIndexConfig {
100    /// Create a new queue index config
101    pub fn new(queue_name: impl Into<String>) -> Self {
102        Self {
103            base: TableIndexConfig::new(queue_name, IndexPolicy::ScanOptimized),
104            priority_column: None,
105            timestamp_column: None,
106            fifo_column: None,
107            enable_min_key_cache: true,
108            enable_size_tracking: true,
109        }
110    }
111
112    /// Set the priority column name
113    pub fn with_priority_column(mut self, column: impl Into<String>) -> Self {
114        self.priority_column = Some(column.into());
115        self
116    }
117
118    /// Set the timestamp column name
119    pub fn with_timestamp_column(mut self, column: impl Into<String>) -> Self {
120        self.timestamp_column = Some(column.into());
121        self
122    }
123
124    /// Set the FIFO sequence column name
125    pub fn with_fifo_column(mut self, column: impl Into<String>) -> Self {
126        self.fifo_column = Some(column.into());
127        self
128    }
129
130    /// Enable or disable min-key cache
131    pub fn with_min_key_cache(mut self, enable: bool) -> Self {
132        self.enable_min_key_cache = enable;
133        self
134    }
135
136    /// Enable or disable size tracking
137    pub fn with_size_tracking(mut self, enable: bool) -> Self {
138        self.enable_size_tracking = enable;
139        self
140    }
141
142    /// Get the composite key columns for this queue
143    pub fn key_columns(&self) -> Vec<&str> {
144        let mut columns = Vec::new();
145        if let Some(ref col) = self.priority_column {
146            columns.push(col.as_str());
147        }
148        if let Some(ref col) = self.timestamp_column {
149            columns.push(col.as_str());
150        }
151        if let Some(ref col) = self.fifo_column {
152            columns.push(col.as_str());
153        }
154        columns
155    }
156}
157
158// ============================================================================
159// QueueIndex - Queue-Optimized Index Structure
160// ============================================================================
161
162/// A queue-optimized ordered index
163///
164/// This provides efficient priority queue operations by:
165/// 1. Maintaining a BTreeMap for O(log N) min-key access
166/// 2. Caching the minimum key for O(1) peek
167/// 3. Tracking size for O(1) count
168///
169/// ## Internal Structure
170///
171/// ```text
172/// ┌─────────────────────────────────────────────────────────────────────┐
173/// │                         QueueIndex                                   │
174/// ├─────────────────────────────────────────────────────────────────────┤
175/// │ entries: BTreeMap<CompositeKey, Value>  ← O(log N) ordered ops      │
176/// │ min_key_cache: Option<CompositeKey>     ← O(1) peek                 │
177/// │ size: AtomicUsize                       ← O(1) count                │
178/// │ version: AtomicU64                      ← For cache invalidation    │
179/// └─────────────────────────────────────────────────────────────────────┘
180/// ```
181pub struct QueueIndex<V: Clone + Send + Sync> {
182    /// The ordered entries
183    entries: RwLock<BTreeMap<CompositeQueueKey, V>>,
184    /// Cached minimum key (invalidated on mutation)
185    min_key_cache: RwLock<Option<CompositeQueueKey>>,
186    /// Current size
187    size: AtomicUsize,
188    /// Version counter for cache invalidation
189    version: AtomicU64,
190    /// Configuration
191    config: QueueIndexConfig,
192}
193
194/// Composite key for queue ordering
195///
196/// Encodes: priority + timestamp + sequence for deterministic ordering.
197#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
198pub struct CompositeQueueKey {
199    /// Primary sort: priority (lower = more urgent)
200    pub priority: i64,
201    /// Secondary sort: ready timestamp
202    pub timestamp: u64,
203    /// Tertiary sort: sequence number (for FIFO within same priority/time)
204    pub sequence: u64,
205    /// Task identifier
206    pub task_id: String,
207}
208
209impl CompositeQueueKey {
210    /// Create a new composite key
211    pub fn new(priority: i64, timestamp: u64, sequence: u64, task_id: impl Into<String>) -> Self {
212        Self {
213            priority,
214            timestamp,
215            sequence,
216            task_id: task_id.into(),
217        }
218    }
219
220    /// Encode to bytes for storage
221    pub fn encode(&self) -> Vec<u8> {
222        let mut bytes = Vec::with_capacity(32 + self.task_id.len());
223        
224        // Priority: map i64 to u64 preserving order
225        let priority_encoded = (self.priority as i128 + i64::MAX as i128 + 1) as u64;
226        bytes.extend_from_slice(&priority_encoded.to_be_bytes());
227        
228        // Timestamp: big-endian
229        bytes.extend_from_slice(&self.timestamp.to_be_bytes());
230        
231        // Sequence: big-endian
232        bytes.extend_from_slice(&self.sequence.to_be_bytes());
233        
234        // Task ID
235        bytes.extend_from_slice(self.task_id.as_bytes());
236        
237        bytes
238    }
239
240    /// Decode from bytes
241    pub fn decode(bytes: &[u8]) -> Option<Self> {
242        if bytes.len() < 24 {
243            return None;
244        }
245        
246        let priority_encoded = u64::from_be_bytes(bytes[0..8].try_into().ok()?);
247        let priority = (priority_encoded as i128 - i64::MAX as i128 - 1) as i64;
248        
249        let timestamp = u64::from_be_bytes(bytes[8..16].try_into().ok()?);
250        let sequence = u64::from_be_bytes(bytes[16..24].try_into().ok()?);
251        let task_id = String::from_utf8(bytes[24..].to_vec()).ok()?;
252        
253        Some(Self {
254            priority,
255            timestamp,
256            sequence,
257            task_id,
258        })
259    }
260}
261
262impl<V: Clone + Send + Sync> QueueIndex<V> {
263    /// Create a new queue index
264    pub fn new(config: QueueIndexConfig) -> Self {
265        Self {
266            entries: RwLock::new(BTreeMap::new()),
267            min_key_cache: RwLock::new(None),
268            size: AtomicUsize::new(0),
269            version: AtomicU64::new(0),
270            config,
271        }
272    }
273
274    /// Insert a task into the queue
275    ///
276    /// Complexity: O(log N)
277    pub fn insert(&self, key: CompositeQueueKey, value: V) {
278        let is_new_min = {
279            let entries = self.entries.read();
280            entries.first_key_value()
281                .map(|(min, _)| &key < min)
282                .unwrap_or(true)
283        };
284        
285        {
286            let mut entries = self.entries.write();
287            let was_absent = entries.insert(key.clone(), value).is_none();
288            
289            if was_absent {
290                self.size.fetch_add(1, Ordering::Relaxed);
291            }
292        }
293        
294        // Update min cache if this is the new minimum
295        if is_new_min && self.config.enable_min_key_cache {
296            *self.min_key_cache.write() = Some(key);
297        }
298        
299        self.version.fetch_add(1, Ordering::Release);
300    }
301
302    /// Peek at the minimum key without removing it
303    ///
304    /// Complexity: O(1) if cache hit, O(log N) if cache miss
305    pub fn peek_min(&self) -> Option<(CompositeQueueKey, V)> {
306        // Try cache first
307        if self.config.enable_min_key_cache {
308            let cache = self.min_key_cache.read();
309            if let Some(ref cached_key) = *cache {
310                let entries = self.entries.read();
311                if let Some(value) = entries.get(cached_key) {
312                    return Some((cached_key.clone(), value.clone()));
313                }
314            }
315        }
316        
317        // Cache miss - scan
318        let entries = self.entries.read();
319        let result = entries.first_key_value()
320            .map(|(k, v)| (k.clone(), v.clone()));
321        
322        // Update cache
323        if self.config.enable_min_key_cache {
324            if let Some((ref key, _)) = result {
325                *self.min_key_cache.write() = Some(key.clone());
326            }
327        }
328        
329        result
330    }
331
332    /// Remove and return the minimum entry
333    ///
334    /// Complexity: O(log N)
335    pub fn pop_min(&self) -> Option<(CompositeQueueKey, V)> {
336        let result = {
337            let mut entries = self.entries.write();
338            entries.pop_first()
339        };
340        
341        if result.is_some() {
342            self.size.fetch_sub(1, Ordering::Relaxed);
343            
344            // Invalidate cache
345            if self.config.enable_min_key_cache {
346                *self.min_key_cache.write() = None;
347            }
348            
349            self.version.fetch_add(1, Ordering::Release);
350        }
351        
352        result
353    }
354
355    /// Remove a specific entry by key
356    ///
357    /// Complexity: O(log N)
358    pub fn remove(&self, key: &CompositeQueueKey) -> Option<V> {
359        let result = {
360            let mut entries = self.entries.write();
361            entries.remove(key)
362        };
363        
364        if result.is_some() {
365            self.size.fetch_sub(1, Ordering::Relaxed);
366            
367            // Invalidate cache if we removed the cached min
368            if self.config.enable_min_key_cache {
369                let should_invalidate = {
370                    let cache = self.min_key_cache.read();
371                    cache.as_ref().map(|c| c == key).unwrap_or(false)
372                };
373                if should_invalidate {
374                    *self.min_key_cache.write() = None;
375                }
376            }
377            
378            self.version.fetch_add(1, Ordering::Release);
379        }
380        
381        result
382    }
383
384    /// Get an entry by key
385    ///
386    /// Complexity: O(log N)
387    pub fn get(&self, key: &CompositeQueueKey) -> Option<V> {
388        self.entries.read().get(key).cloned()
389    }
390
391    /// Check if a key exists
392    ///
393    /// Complexity: O(log N)
394    pub fn contains(&self, key: &CompositeQueueKey) -> bool {
395        self.entries.read().contains_key(key)
396    }
397
398    /// Get queue size
399    ///
400    /// Complexity: O(1)
401    pub fn len(&self) -> usize {
402        if self.config.enable_size_tracking {
403            self.size.load(Ordering::Relaxed)
404        } else {
405            self.entries.read().len()
406        }
407    }
408
409    /// Check if queue is empty
410    ///
411    /// Complexity: O(1)
412    pub fn is_empty(&self) -> bool {
413        self.len() == 0
414    }
415
416    /// Get current version (for change detection)
417    pub fn version(&self) -> u64 {
418        self.version.load(Ordering::Acquire)
419    }
420
421    /// Scan entries with priority <= threshold
422    ///
423    /// Useful for batch processing of high-priority tasks.
424    ///
425    /// Complexity: O(log N + K) where K is result count
426    pub fn scan_by_priority(&self, max_priority: i64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
427        let entries = self.entries.read();
428        
429        entries.iter()
430            .take_while(|(k, _)| k.priority <= max_priority)
431            .take(limit)
432            .map(|(k, v)| (k.clone(), v.clone()))
433            .collect()
434    }
435
436    /// Scan entries ready at or before the given timestamp
437    ///
438    /// Complexity: O(N) in worst case, but typically O(K) if data is time-ordered
439    pub fn scan_ready(&self, now: u64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
440        let entries = self.entries.read();
441        
442        entries.iter()
443            .filter(|(k, _)| k.timestamp <= now)
444            .take(limit)
445            .map(|(k, v)| (k.clone(), v.clone()))
446            .collect()
447    }
448
449    /// Get the configuration
450    pub fn config(&self) -> &QueueIndexConfig {
451        &self.config
452    }
453}
454
455// ============================================================================
456// QueueTableRegistry - Queue-Aware Table Registry
457// ============================================================================
458
459/// Registry extension for queue tables
460pub struct QueueTableRegistry {
461    /// Base registry
462    base: TableIndexRegistry,
463    /// Queue-specific configs
464    queue_configs: RwLock<std::collections::HashMap<String, QueueIndexConfig>>,
465}
466
467impl QueueTableRegistry {
468    /// Create a new registry
469    pub fn new() -> Self {
470        Self {
471            base: TableIndexRegistry::with_default_policy(IndexPolicy::Balanced),
472            queue_configs: RwLock::new(std::collections::HashMap::new()),
473        }
474    }
475
476    /// Register a table as a queue
477    pub fn register_queue(&self, config: QueueIndexConfig) {
478        // Register base config with ScanOptimized policy
479        self.base.configure_table(config.base.clone());
480        
481        // Store queue-specific config
482        self.queue_configs.write().insert(
483            config.base.table_name.clone(),
484            config,
485        );
486    }
487
488    /// Check if a table is registered as a queue
489    pub fn is_queue(&self, table_name: &str) -> bool {
490        self.queue_configs.read().contains_key(table_name)
491    }
492
493    /// Get queue config
494    pub fn get_queue_config(&self, table_name: &str) -> Option<QueueIndexConfig> {
495        self.queue_configs.read().get(table_name).cloned()
496    }
497
498    /// Get the base registry
499    pub fn base(&self) -> &TableIndexRegistry {
500        &self.base
501    }
502}
503
504impl Default for QueueTableRegistry {
505    fn default() -> Self {
506        Self::new()
507    }
508}
509
510// ============================================================================
511// QueueStats - Queue Statistics
512// ============================================================================
513
514/// Statistics for a queue index
515#[derive(Debug, Clone, Default)]
516pub struct QueueIndexStats {
517    /// Current size
518    pub size: usize,
519    /// Number of inserts
520    pub inserts: u64,
521    /// Number of pops
522    pub pops: u64,
523    /// Number of peeks
524    pub peeks: u64,
525    /// Cache hit rate for peek operations
526    pub cache_hit_rate: f64,
527}
528
529// ============================================================================
530// Tests
531// ============================================================================
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536
537    #[test]
538    fn test_composite_key_ordering() {
539        let k1 = CompositeQueueKey::new(1, 100, 1, "task1");
540        let k2 = CompositeQueueKey::new(2, 100, 1, "task2");
541        let k3 = CompositeQueueKey::new(1, 200, 1, "task3");
542        let k4 = CompositeQueueKey::new(1, 100, 2, "task4");
543        
544        // Lower priority comes first
545        assert!(k1 < k2);
546        
547        // Same priority, earlier timestamp comes first
548        assert!(k1 < k3);
549        
550        // Same priority and timestamp, lower sequence comes first
551        assert!(k1 < k4);
552    }
553
554    #[test]
555    fn test_composite_key_encode_decode() {
556        let original = CompositeQueueKey::new(-100, 12345, 999, "my-task-id");
557        let encoded = original.encode();
558        let decoded = CompositeQueueKey::decode(&encoded).unwrap();
559        
560        assert_eq!(decoded.priority, original.priority);
561        assert_eq!(decoded.timestamp, original.timestamp);
562        assert_eq!(decoded.sequence, original.sequence);
563        assert_eq!(decoded.task_id, original.task_id);
564    }
565
566    #[test]
567    fn test_queue_index_insert_pop() {
568        let config = QueueIndexConfig::new("test_queue");
569        let index: QueueIndex<String> = QueueIndex::new(config);
570        
571        // Insert with different priorities
572        index.insert(CompositeQueueKey::new(3, 100, 1, "low"), "low priority".to_string());
573        index.insert(CompositeQueueKey::new(1, 100, 1, "high"), "high priority".to_string());
574        index.insert(CompositeQueueKey::new(2, 100, 1, "medium"), "medium priority".to_string());
575        
576        assert_eq!(index.len(), 3);
577        
578        // Pop should return highest priority (lowest number) first
579        let (key, value) = index.pop_min().unwrap();
580        assert_eq!(key.priority, 1);
581        assert_eq!(value, "high priority");
582        
583        let (key, _) = index.pop_min().unwrap();
584        assert_eq!(key.priority, 2);
585        
586        let (key, _) = index.pop_min().unwrap();
587        assert_eq!(key.priority, 3);
588        
589        assert!(index.is_empty());
590    }
591
592    #[test]
593    fn test_queue_index_peek() {
594        let config = QueueIndexConfig::new("test_queue");
595        let index: QueueIndex<i32> = QueueIndex::new(config);
596        
597        index.insert(CompositeQueueKey::new(2, 100, 1, "task1"), 1);
598        index.insert(CompositeQueueKey::new(1, 100, 1, "task2"), 2);
599        
600        // Peek should return min without removing
601        let (key, value) = index.peek_min().unwrap();
602        assert_eq!(key.priority, 1);
603        assert_eq!(value, 2);
604        
605        // Should still have 2 items
606        assert_eq!(index.len(), 2);
607        
608        // Peek again (should hit cache)
609        let (key, _) = index.peek_min().unwrap();
610        assert_eq!(key.priority, 1);
611    }
612
613    #[test]
614    fn test_queue_index_remove() {
615        let config = QueueIndexConfig::new("test_queue");
616        let index: QueueIndex<i32> = QueueIndex::new(config);
617        
618        let key1 = CompositeQueueKey::new(1, 100, 1, "task1");
619        let key2 = CompositeQueueKey::new(2, 100, 1, "task2");
620        
621        index.insert(key1.clone(), 1);
622        index.insert(key2.clone(), 2);
623        
624        // Remove by key
625        let removed = index.remove(&key1);
626        assert_eq!(removed, Some(1));
627        assert_eq!(index.len(), 1);
628        
629        // Pop should return remaining item
630        let (key, _) = index.pop_min().unwrap();
631        assert_eq!(key.task_id, "task2");
632    }
633
634    #[test]
635    fn test_scan_by_priority() {
636        let config = QueueIndexConfig::new("test_queue");
637        let index: QueueIndex<i32> = QueueIndex::new(config);
638        
639        for i in 1..=10 {
640            index.insert(CompositeQueueKey::new(i, 100, 1, format!("task{}", i)), i as i32);
641        }
642        
643        // Scan priority <= 3
644        let results = index.scan_by_priority(3, 100);
645        assert_eq!(results.len(), 3);
646        assert_eq!(results[0].0.priority, 1);
647        assert_eq!(results[1].0.priority, 2);
648        assert_eq!(results[2].0.priority, 3);
649    }
650
651    #[test]
652    fn test_scan_ready() {
653        let config = QueueIndexConfig::new("test_queue");
654        let index: QueueIndex<i32> = QueueIndex::new(config);
655        
656        // Insert tasks with different ready times
657        index.insert(CompositeQueueKey::new(1, 100, 1, "ready1"), 1);
658        index.insert(CompositeQueueKey::new(1, 200, 1, "ready2"), 2);
659        index.insert(CompositeQueueKey::new(1, 300, 1, "future"), 3);
660        
661        // Scan ready at timestamp 200
662        let results = index.scan_ready(200, 100);
663        assert_eq!(results.len(), 2);
664    }
665
666    #[test]
667    fn test_queue_registry() {
668        let registry = QueueTableRegistry::new();
669        
670        let queue_config = QueueIndexConfig::new("task_queue")
671            .with_priority_column("priority")
672            .with_timestamp_column("ready_at");
673        
674        registry.register_queue(queue_config);
675        
676        assert!(registry.is_queue("task_queue"));
677        assert!(!registry.is_queue("regular_table"));
678        
679        let config = registry.get_queue_config("task_queue").unwrap();
680        assert_eq!(config.priority_column, Some("priority".to_string()));
681    }
682
683    #[test]
684    fn test_fifo_within_priority() {
685        let config = QueueIndexConfig::new("test_queue");
686        let index: QueueIndex<String> = QueueIndex::new(config);
687        
688        // Insert tasks with same priority, different sequence
689        index.insert(CompositeQueueKey::new(1, 100, 3, "third"), "third".to_string());
690        index.insert(CompositeQueueKey::new(1, 100, 1, "first"), "first".to_string());
691        index.insert(CompositeQueueKey::new(1, 100, 2, "second"), "second".to_string());
692        
693        // Should pop in sequence order (FIFO)
694        let (_, v1) = index.pop_min().unwrap();
695        let (_, v2) = index.pop_min().unwrap();
696        let (_, v3) = index.pop_min().unwrap();
697        
698        assert_eq!(v1, "first");
699        assert_eq!(v2, "second");
700        assert_eq!(v3, "third");
701    }
702}