1use std::collections::BTreeMap;
70use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
71
72use parking_lot::RwLock;
73
74use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
75
76#[derive(Debug, Clone)]
82pub struct QueueIndexConfig {
83 pub base: TableIndexConfig,
85 pub priority_column: Option<String>,
87 pub timestamp_column: Option<String>,
89 pub fifo_column: Option<String>,
91 pub enable_min_key_cache: bool,
93 pub enable_size_tracking: bool,
95}
96
97impl QueueIndexConfig {
98 pub fn new(queue_name: impl Into<String>) -> Self {
100 Self {
101 base: TableIndexConfig::new(queue_name, IndexPolicy::ScanOptimized),
102 priority_column: None,
103 timestamp_column: None,
104 fifo_column: None,
105 enable_min_key_cache: true,
106 enable_size_tracking: true,
107 }
108 }
109
110 pub fn with_priority_column(mut self, column: impl Into<String>) -> Self {
112 self.priority_column = Some(column.into());
113 self
114 }
115
116 pub fn with_timestamp_column(mut self, column: impl Into<String>) -> Self {
118 self.timestamp_column = Some(column.into());
119 self
120 }
121
122 pub fn with_fifo_column(mut self, column: impl Into<String>) -> Self {
124 self.fifo_column = Some(column.into());
125 self
126 }
127
128 pub fn with_min_key_cache(mut self, enable: bool) -> Self {
130 self.enable_min_key_cache = enable;
131 self
132 }
133
134 pub fn with_size_tracking(mut self, enable: bool) -> Self {
136 self.enable_size_tracking = enable;
137 self
138 }
139
140 pub fn key_columns(&self) -> Vec<&str> {
142 let mut columns = Vec::new();
143 if let Some(ref col) = self.priority_column {
144 columns.push(col.as_str());
145 }
146 if let Some(ref col) = self.timestamp_column {
147 columns.push(col.as_str());
148 }
149 if let Some(ref col) = self.fifo_column {
150 columns.push(col.as_str());
151 }
152 columns
153 }
154}
155
156pub struct QueueIndex<V: Clone + Send + Sync> {
180 entries: RwLock<BTreeMap<CompositeQueueKey, V>>,
182 min_key_cache: RwLock<Option<CompositeQueueKey>>,
184 size: AtomicUsize,
186 version: AtomicU64,
188 config: QueueIndexConfig,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
196pub struct CompositeQueueKey {
197 pub priority: i64,
199 pub timestamp: u64,
201 pub sequence: u64,
203 pub task_id: String,
205}
206
207impl CompositeQueueKey {
208 pub fn new(priority: i64, timestamp: u64, sequence: u64, task_id: impl Into<String>) -> Self {
210 Self {
211 priority,
212 timestamp,
213 sequence,
214 task_id: task_id.into(),
215 }
216 }
217
218 pub fn encode(&self) -> Vec<u8> {
220 let mut bytes = Vec::with_capacity(32 + self.task_id.len());
221
222 let priority_encoded = (self.priority as i128 + i64::MAX as i128 + 1) as u64;
224 bytes.extend_from_slice(&priority_encoded.to_be_bytes());
225
226 bytes.extend_from_slice(&self.timestamp.to_be_bytes());
228
229 bytes.extend_from_slice(&self.sequence.to_be_bytes());
231
232 bytes.extend_from_slice(self.task_id.as_bytes());
234
235 bytes
236 }
237
238 pub fn decode(bytes: &[u8]) -> Option<Self> {
240 if bytes.len() < 24 {
241 return None;
242 }
243
244 let priority_encoded = u64::from_be_bytes(bytes[0..8].try_into().ok()?);
245 let priority = (priority_encoded as i128 - i64::MAX as i128 - 1) as i64;
246
247 let timestamp = u64::from_be_bytes(bytes[8..16].try_into().ok()?);
248 let sequence = u64::from_be_bytes(bytes[16..24].try_into().ok()?);
249 let task_id = String::from_utf8(bytes[24..].to_vec()).ok()?;
250
251 Some(Self {
252 priority,
253 timestamp,
254 sequence,
255 task_id,
256 })
257 }
258}
259
260impl<V: Clone + Send + Sync> QueueIndex<V> {
261 pub fn new(config: QueueIndexConfig) -> Self {
263 Self {
264 entries: RwLock::new(BTreeMap::new()),
265 min_key_cache: RwLock::new(None),
266 size: AtomicUsize::new(0),
267 version: AtomicU64::new(0),
268 config,
269 }
270 }
271
272 pub fn insert(&self, key: CompositeQueueKey, value: V) {
276 let is_new_min = {
277 let entries = self.entries.read();
278 entries
279 .first_key_value()
280 .map(|(min, _)| &key < min)
281 .unwrap_or(true)
282 };
283
284 {
285 let mut entries = self.entries.write();
286 let was_absent = entries.insert(key.clone(), value).is_none();
287
288 if was_absent {
289 self.size.fetch_add(1, Ordering::Relaxed);
290 }
291 }
292
293 if is_new_min && self.config.enable_min_key_cache {
295 *self.min_key_cache.write() = Some(key);
296 }
297
298 self.version.fetch_add(1, Ordering::Release);
299 }
300
301 pub fn peek_min(&self) -> Option<(CompositeQueueKey, V)> {
305 if self.config.enable_min_key_cache {
307 let cache = self.min_key_cache.read();
308 if let Some(ref cached_key) = *cache {
309 let entries = self.entries.read();
310 if let Some(value) = entries.get(cached_key) {
311 return Some((cached_key.clone(), value.clone()));
312 }
313 }
314 }
315
316 let entries = self.entries.read();
318 let result = entries
319 .first_key_value()
320 .map(|(k, v)| (k.clone(), v.clone()));
321
322 if self.config.enable_min_key_cache {
324 if let Some((ref key, _)) = result {
325 *self.min_key_cache.write() = Some(key.clone());
326 }
327 }
328
329 result
330 }
331
332 pub fn pop_min(&self) -> Option<(CompositeQueueKey, V)> {
336 let result = {
337 let mut entries = self.entries.write();
338 entries.pop_first()
339 };
340
341 if result.is_some() {
342 self.size.fetch_sub(1, Ordering::Relaxed);
343
344 if self.config.enable_min_key_cache {
346 *self.min_key_cache.write() = None;
347 }
348
349 self.version.fetch_add(1, Ordering::Release);
350 }
351
352 result
353 }
354
355 pub fn remove(&self, key: &CompositeQueueKey) -> Option<V> {
359 let result = {
360 let mut entries = self.entries.write();
361 entries.remove(key)
362 };
363
364 if result.is_some() {
365 self.size.fetch_sub(1, Ordering::Relaxed);
366
367 if self.config.enable_min_key_cache {
369 let should_invalidate = {
370 let cache = self.min_key_cache.read();
371 cache.as_ref().map(|c| c == key).unwrap_or(false)
372 };
373 if should_invalidate {
374 *self.min_key_cache.write() = None;
375 }
376 }
377
378 self.version.fetch_add(1, Ordering::Release);
379 }
380
381 result
382 }
383
384 pub fn get(&self, key: &CompositeQueueKey) -> Option<V> {
388 self.entries.read().get(key).cloned()
389 }
390
391 pub fn contains(&self, key: &CompositeQueueKey) -> bool {
395 self.entries.read().contains_key(key)
396 }
397
398 pub fn len(&self) -> usize {
402 if self.config.enable_size_tracking {
403 self.size.load(Ordering::Relaxed)
404 } else {
405 self.entries.read().len()
406 }
407 }
408
409 pub fn is_empty(&self) -> bool {
413 self.len() == 0
414 }
415
416 pub fn version(&self) -> u64 {
418 self.version.load(Ordering::Acquire)
419 }
420
421 pub fn scan_by_priority(&self, max_priority: i64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
427 let entries = self.entries.read();
428
429 entries
430 .iter()
431 .take_while(|(k, _)| k.priority <= max_priority)
432 .take(limit)
433 .map(|(k, v)| (k.clone(), v.clone()))
434 .collect()
435 }
436
437 pub fn scan_ready(&self, now: u64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
441 let entries = self.entries.read();
442
443 entries
444 .iter()
445 .filter(|(k, _)| k.timestamp <= now)
446 .take(limit)
447 .map(|(k, v)| (k.clone(), v.clone()))
448 .collect()
449 }
450
451 pub fn config(&self) -> &QueueIndexConfig {
453 &self.config
454 }
455}
456
457pub struct QueueTableRegistry {
463 base: TableIndexRegistry,
465 queue_configs: RwLock<std::collections::HashMap<String, QueueIndexConfig>>,
467}
468
469impl QueueTableRegistry {
470 pub fn new() -> Self {
472 Self {
473 base: TableIndexRegistry::with_default_policy(IndexPolicy::Balanced),
474 queue_configs: RwLock::new(std::collections::HashMap::new()),
475 }
476 }
477
478 pub fn register_queue(&self, config: QueueIndexConfig) {
480 self.base.configure_table(config.base.clone());
482
483 self.queue_configs
485 .write()
486 .insert(config.base.table_name.clone(), config);
487 }
488
489 pub fn is_queue(&self, table_name: &str) -> bool {
491 self.queue_configs.read().contains_key(table_name)
492 }
493
494 pub fn get_queue_config(&self, table_name: &str) -> Option<QueueIndexConfig> {
496 self.queue_configs.read().get(table_name).cloned()
497 }
498
499 pub fn base(&self) -> &TableIndexRegistry {
501 &self.base
502 }
503}
504
505impl Default for QueueTableRegistry {
506 fn default() -> Self {
507 Self::new()
508 }
509}
510
511#[derive(Debug, Clone, Default)]
517pub struct QueueIndexStats {
518 pub size: usize,
520 pub inserts: u64,
522 pub pops: u64,
524 pub peeks: u64,
526 pub cache_hit_rate: f64,
528}
529
530#[cfg(test)]
535mod tests {
536 use super::*;
537
538 #[test]
539 fn test_composite_key_ordering() {
540 let k1 = CompositeQueueKey::new(1, 100, 1, "task1");
541 let k2 = CompositeQueueKey::new(2, 100, 1, "task2");
542 let k3 = CompositeQueueKey::new(1, 200, 1, "task3");
543 let k4 = CompositeQueueKey::new(1, 100, 2, "task4");
544
545 assert!(k1 < k2);
547
548 assert!(k1 < k3);
550
551 assert!(k1 < k4);
553 }
554
555 #[test]
556 fn test_composite_key_encode_decode() {
557 let original = CompositeQueueKey::new(-100, 12345, 999, "my-task-id");
558 let encoded = original.encode();
559 let decoded = CompositeQueueKey::decode(&encoded).unwrap();
560
561 assert_eq!(decoded.priority, original.priority);
562 assert_eq!(decoded.timestamp, original.timestamp);
563 assert_eq!(decoded.sequence, original.sequence);
564 assert_eq!(decoded.task_id, original.task_id);
565 }
566
567 #[test]
568 fn test_queue_index_insert_pop() {
569 let config = QueueIndexConfig::new("test_queue");
570 let index: QueueIndex<String> = QueueIndex::new(config);
571
572 index.insert(
574 CompositeQueueKey::new(3, 100, 1, "low"),
575 "low priority".to_string(),
576 );
577 index.insert(
578 CompositeQueueKey::new(1, 100, 1, "high"),
579 "high priority".to_string(),
580 );
581 index.insert(
582 CompositeQueueKey::new(2, 100, 1, "medium"),
583 "medium priority".to_string(),
584 );
585
586 assert_eq!(index.len(), 3);
587
588 let (key, value) = index.pop_min().unwrap();
590 assert_eq!(key.priority, 1);
591 assert_eq!(value, "high priority");
592
593 let (key, _) = index.pop_min().unwrap();
594 assert_eq!(key.priority, 2);
595
596 let (key, _) = index.pop_min().unwrap();
597 assert_eq!(key.priority, 3);
598
599 assert!(index.is_empty());
600 }
601
602 #[test]
603 fn test_queue_index_peek() {
604 let config = QueueIndexConfig::new("test_queue");
605 let index: QueueIndex<i32> = QueueIndex::new(config);
606
607 index.insert(CompositeQueueKey::new(2, 100, 1, "task1"), 1);
608 index.insert(CompositeQueueKey::new(1, 100, 1, "task2"), 2);
609
610 let (key, value) = index.peek_min().unwrap();
612 assert_eq!(key.priority, 1);
613 assert_eq!(value, 2);
614
615 assert_eq!(index.len(), 2);
617
618 let (key, _) = index.peek_min().unwrap();
620 assert_eq!(key.priority, 1);
621 }
622
623 #[test]
624 fn test_queue_index_remove() {
625 let config = QueueIndexConfig::new("test_queue");
626 let index: QueueIndex<i32> = QueueIndex::new(config);
627
628 let key1 = CompositeQueueKey::new(1, 100, 1, "task1");
629 let key2 = CompositeQueueKey::new(2, 100, 1, "task2");
630
631 index.insert(key1.clone(), 1);
632 index.insert(key2.clone(), 2);
633
634 let removed = index.remove(&key1);
636 assert_eq!(removed, Some(1));
637 assert_eq!(index.len(), 1);
638
639 let (key, _) = index.pop_min().unwrap();
641 assert_eq!(key.task_id, "task2");
642 }
643
644 #[test]
645 fn test_scan_by_priority() {
646 let config = QueueIndexConfig::new("test_queue");
647 let index: QueueIndex<i32> = QueueIndex::new(config);
648
649 for i in 1..=10 {
650 index.insert(
651 CompositeQueueKey::new(i, 100, 1, format!("task{}", i)),
652 i as i32,
653 );
654 }
655
656 let results = index.scan_by_priority(3, 100);
658 assert_eq!(results.len(), 3);
659 assert_eq!(results[0].0.priority, 1);
660 assert_eq!(results[1].0.priority, 2);
661 assert_eq!(results[2].0.priority, 3);
662 }
663
664 #[test]
665 fn test_scan_ready() {
666 let config = QueueIndexConfig::new("test_queue");
667 let index: QueueIndex<i32> = QueueIndex::new(config);
668
669 index.insert(CompositeQueueKey::new(1, 100, 1, "ready1"), 1);
671 index.insert(CompositeQueueKey::new(1, 200, 1, "ready2"), 2);
672 index.insert(CompositeQueueKey::new(1, 300, 1, "future"), 3);
673
674 let results = index.scan_ready(200, 100);
676 assert_eq!(results.len(), 2);
677 }
678
679 #[test]
680 fn test_queue_registry() {
681 let registry = QueueTableRegistry::new();
682
683 let queue_config = QueueIndexConfig::new("task_queue")
684 .with_priority_column("priority")
685 .with_timestamp_column("ready_at");
686
687 registry.register_queue(queue_config);
688
689 assert!(registry.is_queue("task_queue"));
690 assert!(!registry.is_queue("regular_table"));
691
692 let config = registry.get_queue_config("task_queue").unwrap();
693 assert_eq!(config.priority_column, Some("priority".to_string()));
694 }
695
696 #[test]
697 fn test_fifo_within_priority() {
698 let config = QueueIndexConfig::new("test_queue");
699 let index: QueueIndex<String> = QueueIndex::new(config);
700
701 index.insert(
703 CompositeQueueKey::new(1, 100, 3, "third"),
704 "third".to_string(),
705 );
706 index.insert(
707 CompositeQueueKey::new(1, 100, 1, "first"),
708 "first".to_string(),
709 );
710 index.insert(
711 CompositeQueueKey::new(1, 100, 2, "second"),
712 "second".to_string(),
713 );
714
715 let (_, v1) = index.pop_min().unwrap();
717 let (_, v2) = index.pop_min().unwrap();
718 let (_, v3) = index.pop_min().unwrap();
719
720 assert_eq!(v1, "first");
721 assert_eq!(v2, "second");
722 assert_eq!(v3, "third");
723 }
724}