1use std::collections::BTreeMap;
67use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
68use std::sync::Arc;
69
70use parking_lot::RwLock;
71
72use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
73use crate::key_buffer::ArenaKeyHandle;
74
75#[derive(Debug, Clone)]
81pub struct QueueIndexConfig {
82 pub base: TableIndexConfig,
84 pub priority_column: Option<String>,
86 pub timestamp_column: Option<String>,
88 pub fifo_column: Option<String>,
90 pub enable_min_key_cache: bool,
92 pub enable_size_tracking: bool,
94}
95
96impl QueueIndexConfig {
97 pub fn new(queue_name: impl Into<String>) -> Self {
99 Self {
100 base: TableIndexConfig::new(queue_name, IndexPolicy::ScanOptimized),
101 priority_column: None,
102 timestamp_column: None,
103 fifo_column: None,
104 enable_min_key_cache: true,
105 enable_size_tracking: true,
106 }
107 }
108
109 pub fn with_priority_column(mut self, column: impl Into<String>) -> Self {
111 self.priority_column = Some(column.into());
112 self
113 }
114
115 pub fn with_timestamp_column(mut self, column: impl Into<String>) -> Self {
117 self.timestamp_column = Some(column.into());
118 self
119 }
120
121 pub fn with_fifo_column(mut self, column: impl Into<String>) -> Self {
123 self.fifo_column = Some(column.into());
124 self
125 }
126
127 pub fn with_min_key_cache(mut self, enable: bool) -> Self {
129 self.enable_min_key_cache = enable;
130 self
131 }
132
133 pub fn with_size_tracking(mut self, enable: bool) -> Self {
135 self.enable_size_tracking = enable;
136 self
137 }
138
139 pub fn key_columns(&self) -> Vec<&str> {
141 let mut columns = Vec::new();
142 if let Some(ref col) = self.priority_column {
143 columns.push(col.as_str());
144 }
145 if let Some(ref col) = self.timestamp_column {
146 columns.push(col.as_str());
147 }
148 if let Some(ref col) = self.fifo_column {
149 columns.push(col.as_str());
150 }
151 columns
152 }
153}
154
155pub struct QueueIndex<V: Clone + Send + Sync> {
179 entries: RwLock<BTreeMap<CompositeQueueKey, V>>,
181 min_key_cache: RwLock<Option<CompositeQueueKey>>,
183 size: AtomicUsize,
185 version: AtomicU64,
187 config: QueueIndexConfig,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
195pub struct CompositeQueueKey {
196 pub priority: i64,
198 pub timestamp: u64,
200 pub sequence: u64,
202 pub task_id: String,
204}
205
206impl CompositeQueueKey {
207 pub fn new(priority: i64, timestamp: u64, sequence: u64, task_id: impl Into<String>) -> Self {
209 Self {
210 priority,
211 timestamp,
212 sequence,
213 task_id: task_id.into(),
214 }
215 }
216
217 pub fn encode(&self) -> Vec<u8> {
219 let mut bytes = Vec::with_capacity(32 + self.task_id.len());
220
221 let priority_encoded = (self.priority as i128 + i64::MAX as i128 + 1) as u64;
223 bytes.extend_from_slice(&priority_encoded.to_be_bytes());
224
225 bytes.extend_from_slice(&self.timestamp.to_be_bytes());
227
228 bytes.extend_from_slice(&self.sequence.to_be_bytes());
230
231 bytes.extend_from_slice(self.task_id.as_bytes());
233
234 bytes
235 }
236
237 pub fn decode(bytes: &[u8]) -> Option<Self> {
239 if bytes.len() < 24 {
240 return None;
241 }
242
243 let priority_encoded = u64::from_be_bytes(bytes[0..8].try_into().ok()?);
244 let priority = (priority_encoded as i128 - i64::MAX as i128 - 1) as i64;
245
246 let timestamp = u64::from_be_bytes(bytes[8..16].try_into().ok()?);
247 let sequence = u64::from_be_bytes(bytes[16..24].try_into().ok()?);
248 let task_id = String::from_utf8(bytes[24..].to_vec()).ok()?;
249
250 Some(Self {
251 priority,
252 timestamp,
253 sequence,
254 task_id,
255 })
256 }
257}
258
259impl<V: Clone + Send + Sync> QueueIndex<V> {
260 pub fn new(config: QueueIndexConfig) -> Self {
262 Self {
263 entries: RwLock::new(BTreeMap::new()),
264 min_key_cache: RwLock::new(None),
265 size: AtomicUsize::new(0),
266 version: AtomicU64::new(0),
267 config,
268 }
269 }
270
271 pub fn insert(&self, key: CompositeQueueKey, value: V) {
275 let is_new_min = {
276 let entries = self.entries.read();
277 entries.first_key_value()
278 .map(|(min, _)| &key < min)
279 .unwrap_or(true)
280 };
281
282 {
283 let mut entries = self.entries.write();
284 let was_absent = entries.insert(key.clone(), value).is_none();
285
286 if was_absent {
287 self.size.fetch_add(1, Ordering::Relaxed);
288 }
289 }
290
291 if is_new_min && self.config.enable_min_key_cache {
293 *self.min_key_cache.write() = Some(key);
294 }
295
296 self.version.fetch_add(1, Ordering::Release);
297 }
298
299 pub fn peek_min(&self) -> Option<(CompositeQueueKey, V)> {
303 if self.config.enable_min_key_cache {
305 let cache = self.min_key_cache.read();
306 if let Some(ref cached_key) = *cache {
307 let entries = self.entries.read();
308 if let Some(value) = entries.get(cached_key) {
309 return Some((cached_key.clone(), value.clone()));
310 }
311 }
312 }
313
314 let entries = self.entries.read();
316 let result = entries.first_key_value()
317 .map(|(k, v)| (k.clone(), v.clone()));
318
319 if self.config.enable_min_key_cache {
321 if let Some((ref key, _)) = result {
322 *self.min_key_cache.write() = Some(key.clone());
323 }
324 }
325
326 result
327 }
328
329 pub fn pop_min(&self) -> Option<(CompositeQueueKey, V)> {
333 let result = {
334 let mut entries = self.entries.write();
335 entries.pop_first()
336 };
337
338 if result.is_some() {
339 self.size.fetch_sub(1, Ordering::Relaxed);
340
341 if self.config.enable_min_key_cache {
343 *self.min_key_cache.write() = None;
344 }
345
346 self.version.fetch_add(1, Ordering::Release);
347 }
348
349 result
350 }
351
352 pub fn remove(&self, key: &CompositeQueueKey) -> Option<V> {
356 let result = {
357 let mut entries = self.entries.write();
358 entries.remove(key)
359 };
360
361 if result.is_some() {
362 self.size.fetch_sub(1, Ordering::Relaxed);
363
364 if self.config.enable_min_key_cache {
366 let should_invalidate = {
367 let cache = self.min_key_cache.read();
368 cache.as_ref().map(|c| c == key).unwrap_or(false)
369 };
370 if should_invalidate {
371 *self.min_key_cache.write() = None;
372 }
373 }
374
375 self.version.fetch_add(1, Ordering::Release);
376 }
377
378 result
379 }
380
381 pub fn get(&self, key: &CompositeQueueKey) -> Option<V> {
385 self.entries.read().get(key).cloned()
386 }
387
388 pub fn contains(&self, key: &CompositeQueueKey) -> bool {
392 self.entries.read().contains_key(key)
393 }
394
395 pub fn len(&self) -> usize {
399 if self.config.enable_size_tracking {
400 self.size.load(Ordering::Relaxed)
401 } else {
402 self.entries.read().len()
403 }
404 }
405
406 pub fn is_empty(&self) -> bool {
410 self.len() == 0
411 }
412
413 pub fn version(&self) -> u64 {
415 self.version.load(Ordering::Acquire)
416 }
417
418 pub fn scan_by_priority(&self, max_priority: i64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
424 let entries = self.entries.read();
425
426 entries.iter()
427 .take_while(|(k, _)| k.priority <= max_priority)
428 .take(limit)
429 .map(|(k, v)| (k.clone(), v.clone()))
430 .collect()
431 }
432
433 pub fn scan_ready(&self, now: u64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
437 let entries = self.entries.read();
438
439 entries.iter()
440 .filter(|(k, _)| k.timestamp <= now)
441 .take(limit)
442 .map(|(k, v)| (k.clone(), v.clone()))
443 .collect()
444 }
445
446 pub fn config(&self) -> &QueueIndexConfig {
448 &self.config
449 }
450}
451
452pub struct QueueTableRegistry {
458 base: TableIndexRegistry,
460 queue_configs: RwLock<std::collections::HashMap<String, QueueIndexConfig>>,
462}
463
464impl QueueTableRegistry {
465 pub fn new() -> Self {
467 Self {
468 base: TableIndexRegistry::with_default_policy(IndexPolicy::Balanced),
469 queue_configs: RwLock::new(std::collections::HashMap::new()),
470 }
471 }
472
473 pub fn register_queue(&self, config: QueueIndexConfig) {
475 self.base.configure_table(config.base.clone());
477
478 self.queue_configs.write().insert(
480 config.base.table_name.clone(),
481 config,
482 );
483 }
484
485 pub fn is_queue(&self, table_name: &str) -> bool {
487 self.queue_configs.read().contains_key(table_name)
488 }
489
490 pub fn get_queue_config(&self, table_name: &str) -> Option<QueueIndexConfig> {
492 self.queue_configs.read().get(table_name).cloned()
493 }
494
495 pub fn base(&self) -> &TableIndexRegistry {
497 &self.base
498 }
499}
500
501impl Default for QueueTableRegistry {
502 fn default() -> Self {
503 Self::new()
504 }
505}
506
507#[derive(Debug, Clone, Default)]
513pub struct QueueIndexStats {
514 pub size: usize,
516 pub inserts: u64,
518 pub pops: u64,
520 pub peeks: u64,
522 pub cache_hit_rate: f64,
524}
525
526#[cfg(test)]
531mod tests {
532 use super::*;
533
534 #[test]
535 fn test_composite_key_ordering() {
536 let k1 = CompositeQueueKey::new(1, 100, 1, "task1");
537 let k2 = CompositeQueueKey::new(2, 100, 1, "task2");
538 let k3 = CompositeQueueKey::new(1, 200, 1, "task3");
539 let k4 = CompositeQueueKey::new(1, 100, 2, "task4");
540
541 assert!(k1 < k2);
543
544 assert!(k1 < k3);
546
547 assert!(k1 < k4);
549 }
550
551 #[test]
552 fn test_composite_key_encode_decode() {
553 let original = CompositeQueueKey::new(-100, 12345, 999, "my-task-id");
554 let encoded = original.encode();
555 let decoded = CompositeQueueKey::decode(&encoded).unwrap();
556
557 assert_eq!(decoded.priority, original.priority);
558 assert_eq!(decoded.timestamp, original.timestamp);
559 assert_eq!(decoded.sequence, original.sequence);
560 assert_eq!(decoded.task_id, original.task_id);
561 }
562
563 #[test]
564 fn test_queue_index_insert_pop() {
565 let config = QueueIndexConfig::new("test_queue");
566 let index: QueueIndex<String> = QueueIndex::new(config);
567
568 index.insert(CompositeQueueKey::new(3, 100, 1, "low"), "low priority".to_string());
570 index.insert(CompositeQueueKey::new(1, 100, 1, "high"), "high priority".to_string());
571 index.insert(CompositeQueueKey::new(2, 100, 1, "medium"), "medium priority".to_string());
572
573 assert_eq!(index.len(), 3);
574
575 let (key, value) = index.pop_min().unwrap();
577 assert_eq!(key.priority, 1);
578 assert_eq!(value, "high priority");
579
580 let (key, _) = index.pop_min().unwrap();
581 assert_eq!(key.priority, 2);
582
583 let (key, _) = index.pop_min().unwrap();
584 assert_eq!(key.priority, 3);
585
586 assert!(index.is_empty());
587 }
588
589 #[test]
590 fn test_queue_index_peek() {
591 let config = QueueIndexConfig::new("test_queue");
592 let index: QueueIndex<i32> = QueueIndex::new(config);
593
594 index.insert(CompositeQueueKey::new(2, 100, 1, "task1"), 1);
595 index.insert(CompositeQueueKey::new(1, 100, 1, "task2"), 2);
596
597 let (key, value) = index.peek_min().unwrap();
599 assert_eq!(key.priority, 1);
600 assert_eq!(value, 2);
601
602 assert_eq!(index.len(), 2);
604
605 let (key, _) = index.peek_min().unwrap();
607 assert_eq!(key.priority, 1);
608 }
609
610 #[test]
611 fn test_queue_index_remove() {
612 let config = QueueIndexConfig::new("test_queue");
613 let index: QueueIndex<i32> = QueueIndex::new(config);
614
615 let key1 = CompositeQueueKey::new(1, 100, 1, "task1");
616 let key2 = CompositeQueueKey::new(2, 100, 1, "task2");
617
618 index.insert(key1.clone(), 1);
619 index.insert(key2.clone(), 2);
620
621 let removed = index.remove(&key1);
623 assert_eq!(removed, Some(1));
624 assert_eq!(index.len(), 1);
625
626 let (key, _) = index.pop_min().unwrap();
628 assert_eq!(key.task_id, "task2");
629 }
630
631 #[test]
632 fn test_scan_by_priority() {
633 let config = QueueIndexConfig::new("test_queue");
634 let index: QueueIndex<i32> = QueueIndex::new(config);
635
636 for i in 1..=10 {
637 index.insert(CompositeQueueKey::new(i, 100, 1, format!("task{}", i)), i as i32);
638 }
639
640 let results = index.scan_by_priority(3, 100);
642 assert_eq!(results.len(), 3);
643 assert_eq!(results[0].0.priority, 1);
644 assert_eq!(results[1].0.priority, 2);
645 assert_eq!(results[2].0.priority, 3);
646 }
647
648 #[test]
649 fn test_scan_ready() {
650 let config = QueueIndexConfig::new("test_queue");
651 let index: QueueIndex<i32> = QueueIndex::new(config);
652
653 index.insert(CompositeQueueKey::new(1, 100, 1, "ready1"), 1);
655 index.insert(CompositeQueueKey::new(1, 200, 1, "ready2"), 2);
656 index.insert(CompositeQueueKey::new(1, 300, 1, "future"), 3);
657
658 let results = index.scan_ready(200, 100);
660 assert_eq!(results.len(), 2);
661 }
662
663 #[test]
664 fn test_queue_registry() {
665 let registry = QueueTableRegistry::new();
666
667 let queue_config = QueueIndexConfig::new("task_queue")
668 .with_priority_column("priority")
669 .with_timestamp_column("ready_at");
670
671 registry.register_queue(queue_config);
672
673 assert!(registry.is_queue("task_queue"));
674 assert!(!registry.is_queue("regular_table"));
675
676 let config = registry.get_queue_config("task_queue").unwrap();
677 assert_eq!(config.priority_column, Some("priority".to_string()));
678 }
679
680 #[test]
681 fn test_fifo_within_priority() {
682 let config = QueueIndexConfig::new("test_queue");
683 let index: QueueIndex<String> = QueueIndex::new(config);
684
685 index.insert(CompositeQueueKey::new(1, 100, 3, "third"), "third".to_string());
687 index.insert(CompositeQueueKey::new(1, 100, 1, "first"), "first".to_string());
688 index.insert(CompositeQueueKey::new(1, 100, 2, "second"), "second".to_string());
689
690 let (_, v1) = index.pop_min().unwrap();
692 let (_, v2) = index.pop_min().unwrap();
693 let (_, v3) = index.pop_min().unwrap();
694
695 assert_eq!(v1, "first");
696 assert_eq!(v2, "second");
697 assert_eq!(v3, "third");
698 }
699}