Skip to main content

sochdb_storage/
queue_index.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Queue-Optimized Index Policy
19//!
20//! This module extends the per-table index policy with queue-specific
21//! optimizations that ensure efficient priority queue operations.
22//!
23//! ## Queue Access Patterns
24//!
25//! Queues have specific access patterns that differ from general tables:
26//!
27//! | Operation | Pattern                              | Requirement          |
28//! |-----------|--------------------------------------|----------------------|
29//! | Enqueue   | Insert at any position               | O(log N) or better   |
30//! | Dequeue   | Find minimum key, delete it          | O(log N) find + O(1) delete |
31//! | Peek      | Read minimum key without deletion    | O(log N)             |
32//! | Count     | Get queue size                       | O(1)                 |
33//!
34//! ## Why Queue Tables Need ScanOptimized Policy
35//!
36//! The dequeue operation requires "find minimum key", which is:
37//! - O(log N) with ordered index (ScanOptimized)
38//! - O(N) without ordered index (WriteOptimized/Balanced with deferred sort)
39//!
40//! For a queue with 10,000 tasks:
41//! - With ScanOptimized: ~14 comparisons per dequeue
42//! - With WriteOptimized: ~10,000 comparisons per dequeue
43//!
44//! ## Avoiding Deferred-Sort Latency Spikes
45//!
46//! The Balanced policy uses "deferred sorting" where writes are O(1) append
47//! and scans trigger O(N log N) sort-on-demand. This creates latency spikes:
48//!
49//! ```text
50//! Pop #1: 0.1ms (memtable small)
51//! Pop #2: 0.1ms
52//! ...
53//! Pop #1000: 50ms (sort triggered!) ← Latency spike
54//! Pop #1001: 0.2ms (now sorted)
55//! ```
56//!
57//! ScanOptimized maintains order on every write, giving predictable latency.
58//!
59//! ## Queue Index Configuration
60//!
61//! ```rust
62//! let config = QueueIndexConfig::new("task_queue")
63//!     .with_priority_column("priority")
64//!     .with_timestamp_column("ready_at")
65//!     .with_fifo_column("sequence")
66//!     .build();
67//! ```
68
69use std::collections::BTreeMap;
70use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
71
72use parking_lot::RwLock;
73
74use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
75
76// ============================================================================
77// QueueIndexConfig - Queue-Specific Configuration
78// ============================================================================
79
80/// Configuration for queue tables
81#[derive(Debug, Clone)]
82pub struct QueueIndexConfig {
83    /// Base table configuration
84    pub base: TableIndexConfig,
85    /// Name of the priority column (for composite key ordering)
86    pub priority_column: Option<String>,
87    /// Name of the timestamp column (for ready-time ordering)
88    pub timestamp_column: Option<String>,
89    /// Name of the sequence column (for FIFO within same priority)
90    pub fifo_column: Option<String>,
91    /// Whether to maintain min-key cache for O(1) peek
92    pub enable_min_key_cache: bool,
93    /// Whether to track queue size for O(1) count
94    pub enable_size_tracking: bool,
95}
96
97impl QueueIndexConfig {
98    /// Create a new queue index config
99    pub fn new(queue_name: impl Into<String>) -> Self {
100        Self {
101            base: TableIndexConfig::new(queue_name, IndexPolicy::ScanOptimized),
102            priority_column: None,
103            timestamp_column: None,
104            fifo_column: None,
105            enable_min_key_cache: true,
106            enable_size_tracking: true,
107        }
108    }
109
110    /// Set the priority column name
111    pub fn with_priority_column(mut self, column: impl Into<String>) -> Self {
112        self.priority_column = Some(column.into());
113        self
114    }
115
116    /// Set the timestamp column name
117    pub fn with_timestamp_column(mut self, column: impl Into<String>) -> Self {
118        self.timestamp_column = Some(column.into());
119        self
120    }
121
122    /// Set the FIFO sequence column name
123    pub fn with_fifo_column(mut self, column: impl Into<String>) -> Self {
124        self.fifo_column = Some(column.into());
125        self
126    }
127
128    /// Enable or disable min-key cache
129    pub fn with_min_key_cache(mut self, enable: bool) -> Self {
130        self.enable_min_key_cache = enable;
131        self
132    }
133
134    /// Enable or disable size tracking
135    pub fn with_size_tracking(mut self, enable: bool) -> Self {
136        self.enable_size_tracking = enable;
137        self
138    }
139
140    /// Get the composite key columns for this queue
141    pub fn key_columns(&self) -> Vec<&str> {
142        let mut columns = Vec::new();
143        if let Some(ref col) = self.priority_column {
144            columns.push(col.as_str());
145        }
146        if let Some(ref col) = self.timestamp_column {
147            columns.push(col.as_str());
148        }
149        if let Some(ref col) = self.fifo_column {
150            columns.push(col.as_str());
151        }
152        columns
153    }
154}
155
156// ============================================================================
157// QueueIndex - Queue-Optimized Index Structure
158// ============================================================================
159
160/// A queue-optimized ordered index
161///
162/// This provides efficient priority queue operations by:
163/// 1. Maintaining a BTreeMap for O(log N) min-key access
164/// 2. Caching the minimum key for O(1) peek
165/// 3. Tracking size for O(1) count
166///
167/// ## Internal Structure
168///
169/// ```text
170/// ┌─────────────────────────────────────────────────────────────────────┐
171/// │                         QueueIndex                                   │
172/// ├─────────────────────────────────────────────────────────────────────┤
173/// │ entries: BTreeMap<CompositeKey, Value>  ← O(log N) ordered ops      │
174/// │ min_key_cache: Option<CompositeKey>     ← O(1) peek                 │
175/// │ size: AtomicUsize                       ← O(1) count                │
176/// │ version: AtomicU64                      ← For cache invalidation    │
177/// └─────────────────────────────────────────────────────────────────────┘
178/// ```
179pub struct QueueIndex<V: Clone + Send + Sync> {
180    /// The ordered entries
181    entries: RwLock<BTreeMap<CompositeQueueKey, V>>,
182    /// Cached minimum key (invalidated on mutation)
183    min_key_cache: RwLock<Option<CompositeQueueKey>>,
184    /// Current size
185    size: AtomicUsize,
186    /// Version counter for cache invalidation
187    version: AtomicU64,
188    /// Configuration
189    config: QueueIndexConfig,
190}
191
192/// Composite key for queue ordering
193///
194/// Encodes: priority + timestamp + sequence for deterministic ordering.
195#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
196pub struct CompositeQueueKey {
197    /// Primary sort: priority (lower = more urgent)
198    pub priority: i64,
199    /// Secondary sort: ready timestamp
200    pub timestamp: u64,
201    /// Tertiary sort: sequence number (for FIFO within same priority/time)
202    pub sequence: u64,
203    /// Task identifier
204    pub task_id: String,
205}
206
207impl CompositeQueueKey {
208    /// Create a new composite key
209    pub fn new(priority: i64, timestamp: u64, sequence: u64, task_id: impl Into<String>) -> Self {
210        Self {
211            priority,
212            timestamp,
213            sequence,
214            task_id: task_id.into(),
215        }
216    }
217
218    /// Encode to bytes for storage
219    pub fn encode(&self) -> Vec<u8> {
220        let mut bytes = Vec::with_capacity(32 + self.task_id.len());
221
222        // Priority: map i64 to u64 preserving order
223        let priority_encoded = (self.priority as i128 + i64::MAX as i128 + 1) as u64;
224        bytes.extend_from_slice(&priority_encoded.to_be_bytes());
225
226        // Timestamp: big-endian
227        bytes.extend_from_slice(&self.timestamp.to_be_bytes());
228
229        // Sequence: big-endian
230        bytes.extend_from_slice(&self.sequence.to_be_bytes());
231
232        // Task ID
233        bytes.extend_from_slice(self.task_id.as_bytes());
234
235        bytes
236    }
237
238    /// Decode from bytes
239    pub fn decode(bytes: &[u8]) -> Option<Self> {
240        if bytes.len() < 24 {
241            return None;
242        }
243
244        let priority_encoded = u64::from_be_bytes(bytes[0..8].try_into().ok()?);
245        let priority = (priority_encoded as i128 - i64::MAX as i128 - 1) as i64;
246
247        let timestamp = u64::from_be_bytes(bytes[8..16].try_into().ok()?);
248        let sequence = u64::from_be_bytes(bytes[16..24].try_into().ok()?);
249        let task_id = String::from_utf8(bytes[24..].to_vec()).ok()?;
250
251        Some(Self {
252            priority,
253            timestamp,
254            sequence,
255            task_id,
256        })
257    }
258}
259
260impl<V: Clone + Send + Sync> QueueIndex<V> {
261    /// Create a new queue index
262    pub fn new(config: QueueIndexConfig) -> Self {
263        Self {
264            entries: RwLock::new(BTreeMap::new()),
265            min_key_cache: RwLock::new(None),
266            size: AtomicUsize::new(0),
267            version: AtomicU64::new(0),
268            config,
269        }
270    }
271
272    /// Insert a task into the queue
273    ///
274    /// Complexity: O(log N)
275    pub fn insert(&self, key: CompositeQueueKey, value: V) {
276        let is_new_min = {
277            let entries = self.entries.read();
278            entries
279                .first_key_value()
280                .map(|(min, _)| &key < min)
281                .unwrap_or(true)
282        };
283
284        {
285            let mut entries = self.entries.write();
286            let was_absent = entries.insert(key.clone(), value).is_none();
287
288            if was_absent {
289                self.size.fetch_add(1, Ordering::Relaxed);
290            }
291        }
292
293        // Update min cache if this is the new minimum
294        if is_new_min && self.config.enable_min_key_cache {
295            *self.min_key_cache.write() = Some(key);
296        }
297
298        self.version.fetch_add(1, Ordering::Release);
299    }
300
301    /// Peek at the minimum key without removing it
302    ///
303    /// Complexity: O(1) if cache hit, O(log N) if cache miss
304    pub fn peek_min(&self) -> Option<(CompositeQueueKey, V)> {
305        // Try cache first
306        if self.config.enable_min_key_cache {
307            let cache = self.min_key_cache.read();
308            if let Some(ref cached_key) = *cache {
309                let entries = self.entries.read();
310                if let Some(value) = entries.get(cached_key) {
311                    return Some((cached_key.clone(), value.clone()));
312                }
313            }
314        }
315
316        // Cache miss - scan
317        let entries = self.entries.read();
318        let result = entries
319            .first_key_value()
320            .map(|(k, v)| (k.clone(), v.clone()));
321
322        // Update cache
323        if self.config.enable_min_key_cache {
324            if let Some((ref key, _)) = result {
325                *self.min_key_cache.write() = Some(key.clone());
326            }
327        }
328
329        result
330    }
331
332    /// Remove and return the minimum entry
333    ///
334    /// Complexity: O(log N)
335    pub fn pop_min(&self) -> Option<(CompositeQueueKey, V)> {
336        let result = {
337            let mut entries = self.entries.write();
338            entries.pop_first()
339        };
340
341        if result.is_some() {
342            self.size.fetch_sub(1, Ordering::Relaxed);
343
344            // Invalidate cache
345            if self.config.enable_min_key_cache {
346                *self.min_key_cache.write() = None;
347            }
348
349            self.version.fetch_add(1, Ordering::Release);
350        }
351
352        result
353    }
354
355    /// Remove a specific entry by key
356    ///
357    /// Complexity: O(log N)
358    pub fn remove(&self, key: &CompositeQueueKey) -> Option<V> {
359        let result = {
360            let mut entries = self.entries.write();
361            entries.remove(key)
362        };
363
364        if result.is_some() {
365            self.size.fetch_sub(1, Ordering::Relaxed);
366
367            // Invalidate cache if we removed the cached min
368            if self.config.enable_min_key_cache {
369                let should_invalidate = {
370                    let cache = self.min_key_cache.read();
371                    cache.as_ref().map(|c| c == key).unwrap_or(false)
372                };
373                if should_invalidate {
374                    *self.min_key_cache.write() = None;
375                }
376            }
377
378            self.version.fetch_add(1, Ordering::Release);
379        }
380
381        result
382    }
383
384    /// Get an entry by key
385    ///
386    /// Complexity: O(log N)
387    pub fn get(&self, key: &CompositeQueueKey) -> Option<V> {
388        self.entries.read().get(key).cloned()
389    }
390
391    /// Check if a key exists
392    ///
393    /// Complexity: O(log N)
394    pub fn contains(&self, key: &CompositeQueueKey) -> bool {
395        self.entries.read().contains_key(key)
396    }
397
398    /// Get queue size
399    ///
400    /// Complexity: O(1)
401    pub fn len(&self) -> usize {
402        if self.config.enable_size_tracking {
403            self.size.load(Ordering::Relaxed)
404        } else {
405            self.entries.read().len()
406        }
407    }
408
409    /// Check if queue is empty
410    ///
411    /// Complexity: O(1)
412    pub fn is_empty(&self) -> bool {
413        self.len() == 0
414    }
415
416    /// Get current version (for change detection)
417    pub fn version(&self) -> u64 {
418        self.version.load(Ordering::Acquire)
419    }
420
421    /// Scan entries with priority <= threshold
422    ///
423    /// Useful for batch processing of high-priority tasks.
424    ///
425    /// Complexity: O(log N + K) where K is result count
426    pub fn scan_by_priority(&self, max_priority: i64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
427        let entries = self.entries.read();
428
429        entries
430            .iter()
431            .take_while(|(k, _)| k.priority <= max_priority)
432            .take(limit)
433            .map(|(k, v)| (k.clone(), v.clone()))
434            .collect()
435    }
436
437    /// Scan entries ready at or before the given timestamp
438    ///
439    /// Complexity: O(N) in worst case, but typically O(K) if data is time-ordered
440    pub fn scan_ready(&self, now: u64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
441        let entries = self.entries.read();
442
443        entries
444            .iter()
445            .filter(|(k, _)| k.timestamp <= now)
446            .take(limit)
447            .map(|(k, v)| (k.clone(), v.clone()))
448            .collect()
449    }
450
451    /// Get the configuration
452    pub fn config(&self) -> &QueueIndexConfig {
453        &self.config
454    }
455}
456
457// ============================================================================
458// QueueTableRegistry - Queue-Aware Table Registry
459// ============================================================================
460
461/// Registry extension for queue tables
462pub struct QueueTableRegistry {
463    /// Base registry
464    base: TableIndexRegistry,
465    /// Queue-specific configs
466    queue_configs: RwLock<std::collections::HashMap<String, QueueIndexConfig>>,
467}
468
469impl QueueTableRegistry {
470    /// Create a new registry
471    pub fn new() -> Self {
472        Self {
473            base: TableIndexRegistry::with_default_policy(IndexPolicy::Balanced),
474            queue_configs: RwLock::new(std::collections::HashMap::new()),
475        }
476    }
477
478    /// Register a table as a queue
479    pub fn register_queue(&self, config: QueueIndexConfig) {
480        // Register base config with ScanOptimized policy
481        self.base.configure_table(config.base.clone());
482
483        // Store queue-specific config
484        self.queue_configs
485            .write()
486            .insert(config.base.table_name.clone(), config);
487    }
488
489    /// Check if a table is registered as a queue
490    pub fn is_queue(&self, table_name: &str) -> bool {
491        self.queue_configs.read().contains_key(table_name)
492    }
493
494    /// Get queue config
495    pub fn get_queue_config(&self, table_name: &str) -> Option<QueueIndexConfig> {
496        self.queue_configs.read().get(table_name).cloned()
497    }
498
499    /// Get the base registry
500    pub fn base(&self) -> &TableIndexRegistry {
501        &self.base
502    }
503}
504
505impl Default for QueueTableRegistry {
506    fn default() -> Self {
507        Self::new()
508    }
509}
510
511// ============================================================================
512// QueueStats - Queue Statistics
513// ============================================================================
514
515/// Statistics for a queue index
516#[derive(Debug, Clone, Default)]
517pub struct QueueIndexStats {
518    /// Current size
519    pub size: usize,
520    /// Number of inserts
521    pub inserts: u64,
522    /// Number of pops
523    pub pops: u64,
524    /// Number of peeks
525    pub peeks: u64,
526    /// Cache hit rate for peek operations
527    pub cache_hit_rate: f64,
528}
529
530// ============================================================================
531// Tests
532// ============================================================================
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537
538    #[test]
539    fn test_composite_key_ordering() {
540        let k1 = CompositeQueueKey::new(1, 100, 1, "task1");
541        let k2 = CompositeQueueKey::new(2, 100, 1, "task2");
542        let k3 = CompositeQueueKey::new(1, 200, 1, "task3");
543        let k4 = CompositeQueueKey::new(1, 100, 2, "task4");
544
545        // Lower priority comes first
546        assert!(k1 < k2);
547
548        // Same priority, earlier timestamp comes first
549        assert!(k1 < k3);
550
551        // Same priority and timestamp, lower sequence comes first
552        assert!(k1 < k4);
553    }
554
555    #[test]
556    fn test_composite_key_encode_decode() {
557        let original = CompositeQueueKey::new(-100, 12345, 999, "my-task-id");
558        let encoded = original.encode();
559        let decoded = CompositeQueueKey::decode(&encoded).unwrap();
560
561        assert_eq!(decoded.priority, original.priority);
562        assert_eq!(decoded.timestamp, original.timestamp);
563        assert_eq!(decoded.sequence, original.sequence);
564        assert_eq!(decoded.task_id, original.task_id);
565    }
566
567    #[test]
568    fn test_queue_index_insert_pop() {
569        let config = QueueIndexConfig::new("test_queue");
570        let index: QueueIndex<String> = QueueIndex::new(config);
571
572        // Insert with different priorities
573        index.insert(
574            CompositeQueueKey::new(3, 100, 1, "low"),
575            "low priority".to_string(),
576        );
577        index.insert(
578            CompositeQueueKey::new(1, 100, 1, "high"),
579            "high priority".to_string(),
580        );
581        index.insert(
582            CompositeQueueKey::new(2, 100, 1, "medium"),
583            "medium priority".to_string(),
584        );
585
586        assert_eq!(index.len(), 3);
587
588        // Pop should return highest priority (lowest number) first
589        let (key, value) = index.pop_min().unwrap();
590        assert_eq!(key.priority, 1);
591        assert_eq!(value, "high priority");
592
593        let (key, _) = index.pop_min().unwrap();
594        assert_eq!(key.priority, 2);
595
596        let (key, _) = index.pop_min().unwrap();
597        assert_eq!(key.priority, 3);
598
599        assert!(index.is_empty());
600    }
601
602    #[test]
603    fn test_queue_index_peek() {
604        let config = QueueIndexConfig::new("test_queue");
605        let index: QueueIndex<i32> = QueueIndex::new(config);
606
607        index.insert(CompositeQueueKey::new(2, 100, 1, "task1"), 1);
608        index.insert(CompositeQueueKey::new(1, 100, 1, "task2"), 2);
609
610        // Peek should return min without removing
611        let (key, value) = index.peek_min().unwrap();
612        assert_eq!(key.priority, 1);
613        assert_eq!(value, 2);
614
615        // Should still have 2 items
616        assert_eq!(index.len(), 2);
617
618        // Peek again (should hit cache)
619        let (key, _) = index.peek_min().unwrap();
620        assert_eq!(key.priority, 1);
621    }
622
623    #[test]
624    fn test_queue_index_remove() {
625        let config = QueueIndexConfig::new("test_queue");
626        let index: QueueIndex<i32> = QueueIndex::new(config);
627
628        let key1 = CompositeQueueKey::new(1, 100, 1, "task1");
629        let key2 = CompositeQueueKey::new(2, 100, 1, "task2");
630
631        index.insert(key1.clone(), 1);
632        index.insert(key2.clone(), 2);
633
634        // Remove by key
635        let removed = index.remove(&key1);
636        assert_eq!(removed, Some(1));
637        assert_eq!(index.len(), 1);
638
639        // Pop should return remaining item
640        let (key, _) = index.pop_min().unwrap();
641        assert_eq!(key.task_id, "task2");
642    }
643
644    #[test]
645    fn test_scan_by_priority() {
646        let config = QueueIndexConfig::new("test_queue");
647        let index: QueueIndex<i32> = QueueIndex::new(config);
648
649        for i in 1..=10 {
650            index.insert(
651                CompositeQueueKey::new(i, 100, 1, format!("task{}", i)),
652                i as i32,
653            );
654        }
655
656        // Scan priority <= 3
657        let results = index.scan_by_priority(3, 100);
658        assert_eq!(results.len(), 3);
659        assert_eq!(results[0].0.priority, 1);
660        assert_eq!(results[1].0.priority, 2);
661        assert_eq!(results[2].0.priority, 3);
662    }
663
664    #[test]
665    fn test_scan_ready() {
666        let config = QueueIndexConfig::new("test_queue");
667        let index: QueueIndex<i32> = QueueIndex::new(config);
668
669        // Insert tasks with different ready times
670        index.insert(CompositeQueueKey::new(1, 100, 1, "ready1"), 1);
671        index.insert(CompositeQueueKey::new(1, 200, 1, "ready2"), 2);
672        index.insert(CompositeQueueKey::new(1, 300, 1, "future"), 3);
673
674        // Scan ready at timestamp 200
675        let results = index.scan_ready(200, 100);
676        assert_eq!(results.len(), 2);
677    }
678
679    #[test]
680    fn test_queue_registry() {
681        let registry = QueueTableRegistry::new();
682
683        let queue_config = QueueIndexConfig::new("task_queue")
684            .with_priority_column("priority")
685            .with_timestamp_column("ready_at");
686
687        registry.register_queue(queue_config);
688
689        assert!(registry.is_queue("task_queue"));
690        assert!(!registry.is_queue("regular_table"));
691
692        let config = registry.get_queue_config("task_queue").unwrap();
693        assert_eq!(config.priority_column, Some("priority".to_string()));
694    }
695
696    #[test]
697    fn test_fifo_within_priority() {
698        let config = QueueIndexConfig::new("test_queue");
699        let index: QueueIndex<String> = QueueIndex::new(config);
700
701        // Insert tasks with same priority, different sequence
702        index.insert(
703            CompositeQueueKey::new(1, 100, 3, "third"),
704            "third".to_string(),
705        );
706        index.insert(
707            CompositeQueueKey::new(1, 100, 1, "first"),
708            "first".to_string(),
709        );
710        index.insert(
711            CompositeQueueKey::new(1, 100, 2, "second"),
712            "second".to_string(),
713        );
714
715        // Should pop in sequence order (FIFO)
716        let (_, v1) = index.pop_min().unwrap();
717        let (_, v2) = index.pop_min().unwrap();
718        let (_, v3) = index.pop_min().unwrap();
719
720        assert_eq!(v1, "first");
721        assert_eq!(v2, "second");
722        assert_eq!(v3, "third");
723    }
724}