1use std::collections::BTreeMap;
70use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
71use std::sync::Arc;
72
73use parking_lot::RwLock;
74
75use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
76use crate::key_buffer::ArenaKeyHandle;
77
78#[derive(Debug, Clone)]
84pub struct QueueIndexConfig {
85 pub base: TableIndexConfig,
87 pub priority_column: Option<String>,
89 pub timestamp_column: Option<String>,
91 pub fifo_column: Option<String>,
93 pub enable_min_key_cache: bool,
95 pub enable_size_tracking: bool,
97}
98
99impl QueueIndexConfig {
100 pub fn new(queue_name: impl Into<String>) -> Self {
102 Self {
103 base: TableIndexConfig::new(queue_name, IndexPolicy::ScanOptimized),
104 priority_column: None,
105 timestamp_column: None,
106 fifo_column: None,
107 enable_min_key_cache: true,
108 enable_size_tracking: true,
109 }
110 }
111
112 pub fn with_priority_column(mut self, column: impl Into<String>) -> Self {
114 self.priority_column = Some(column.into());
115 self
116 }
117
118 pub fn with_timestamp_column(mut self, column: impl Into<String>) -> Self {
120 self.timestamp_column = Some(column.into());
121 self
122 }
123
124 pub fn with_fifo_column(mut self, column: impl Into<String>) -> Self {
126 self.fifo_column = Some(column.into());
127 self
128 }
129
130 pub fn with_min_key_cache(mut self, enable: bool) -> Self {
132 self.enable_min_key_cache = enable;
133 self
134 }
135
136 pub fn with_size_tracking(mut self, enable: bool) -> Self {
138 self.enable_size_tracking = enable;
139 self
140 }
141
142 pub fn key_columns(&self) -> Vec<&str> {
144 let mut columns = Vec::new();
145 if let Some(ref col) = self.priority_column {
146 columns.push(col.as_str());
147 }
148 if let Some(ref col) = self.timestamp_column {
149 columns.push(col.as_str());
150 }
151 if let Some(ref col) = self.fifo_column {
152 columns.push(col.as_str());
153 }
154 columns
155 }
156}
157
158pub struct QueueIndex<V: Clone + Send + Sync> {
182 entries: RwLock<BTreeMap<CompositeQueueKey, V>>,
184 min_key_cache: RwLock<Option<CompositeQueueKey>>,
186 size: AtomicUsize,
188 version: AtomicU64,
190 config: QueueIndexConfig,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
198pub struct CompositeQueueKey {
199 pub priority: i64,
201 pub timestamp: u64,
203 pub sequence: u64,
205 pub task_id: String,
207}
208
209impl CompositeQueueKey {
210 pub fn new(priority: i64, timestamp: u64, sequence: u64, task_id: impl Into<String>) -> Self {
212 Self {
213 priority,
214 timestamp,
215 sequence,
216 task_id: task_id.into(),
217 }
218 }
219
220 pub fn encode(&self) -> Vec<u8> {
222 let mut bytes = Vec::with_capacity(32 + self.task_id.len());
223
224 let priority_encoded = (self.priority as i128 + i64::MAX as i128 + 1) as u64;
226 bytes.extend_from_slice(&priority_encoded.to_be_bytes());
227
228 bytes.extend_from_slice(&self.timestamp.to_be_bytes());
230
231 bytes.extend_from_slice(&self.sequence.to_be_bytes());
233
234 bytes.extend_from_slice(self.task_id.as_bytes());
236
237 bytes
238 }
239
240 pub fn decode(bytes: &[u8]) -> Option<Self> {
242 if bytes.len() < 24 {
243 return None;
244 }
245
246 let priority_encoded = u64::from_be_bytes(bytes[0..8].try_into().ok()?);
247 let priority = (priority_encoded as i128 - i64::MAX as i128 - 1) as i64;
248
249 let timestamp = u64::from_be_bytes(bytes[8..16].try_into().ok()?);
250 let sequence = u64::from_be_bytes(bytes[16..24].try_into().ok()?);
251 let task_id = String::from_utf8(bytes[24..].to_vec()).ok()?;
252
253 Some(Self {
254 priority,
255 timestamp,
256 sequence,
257 task_id,
258 })
259 }
260}
261
262impl<V: Clone + Send + Sync> QueueIndex<V> {
263 pub fn new(config: QueueIndexConfig) -> Self {
265 Self {
266 entries: RwLock::new(BTreeMap::new()),
267 min_key_cache: RwLock::new(None),
268 size: AtomicUsize::new(0),
269 version: AtomicU64::new(0),
270 config,
271 }
272 }
273
274 pub fn insert(&self, key: CompositeQueueKey, value: V) {
278 let is_new_min = {
279 let entries = self.entries.read();
280 entries.first_key_value()
281 .map(|(min, _)| &key < min)
282 .unwrap_or(true)
283 };
284
285 {
286 let mut entries = self.entries.write();
287 let was_absent = entries.insert(key.clone(), value).is_none();
288
289 if was_absent {
290 self.size.fetch_add(1, Ordering::Relaxed);
291 }
292 }
293
294 if is_new_min && self.config.enable_min_key_cache {
296 *self.min_key_cache.write() = Some(key);
297 }
298
299 self.version.fetch_add(1, Ordering::Release);
300 }
301
302 pub fn peek_min(&self) -> Option<(CompositeQueueKey, V)> {
306 if self.config.enable_min_key_cache {
308 let cache = self.min_key_cache.read();
309 if let Some(ref cached_key) = *cache {
310 let entries = self.entries.read();
311 if let Some(value) = entries.get(cached_key) {
312 return Some((cached_key.clone(), value.clone()));
313 }
314 }
315 }
316
317 let entries = self.entries.read();
319 let result = entries.first_key_value()
320 .map(|(k, v)| (k.clone(), v.clone()));
321
322 if self.config.enable_min_key_cache {
324 if let Some((ref key, _)) = result {
325 *self.min_key_cache.write() = Some(key.clone());
326 }
327 }
328
329 result
330 }
331
332 pub fn pop_min(&self) -> Option<(CompositeQueueKey, V)> {
336 let result = {
337 let mut entries = self.entries.write();
338 entries.pop_first()
339 };
340
341 if result.is_some() {
342 self.size.fetch_sub(1, Ordering::Relaxed);
343
344 if self.config.enable_min_key_cache {
346 *self.min_key_cache.write() = None;
347 }
348
349 self.version.fetch_add(1, Ordering::Release);
350 }
351
352 result
353 }
354
355 pub fn remove(&self, key: &CompositeQueueKey) -> Option<V> {
359 let result = {
360 let mut entries = self.entries.write();
361 entries.remove(key)
362 };
363
364 if result.is_some() {
365 self.size.fetch_sub(1, Ordering::Relaxed);
366
367 if self.config.enable_min_key_cache {
369 let should_invalidate = {
370 let cache = self.min_key_cache.read();
371 cache.as_ref().map(|c| c == key).unwrap_or(false)
372 };
373 if should_invalidate {
374 *self.min_key_cache.write() = None;
375 }
376 }
377
378 self.version.fetch_add(1, Ordering::Release);
379 }
380
381 result
382 }
383
384 pub fn get(&self, key: &CompositeQueueKey) -> Option<V> {
388 self.entries.read().get(key).cloned()
389 }
390
391 pub fn contains(&self, key: &CompositeQueueKey) -> bool {
395 self.entries.read().contains_key(key)
396 }
397
398 pub fn len(&self) -> usize {
402 if self.config.enable_size_tracking {
403 self.size.load(Ordering::Relaxed)
404 } else {
405 self.entries.read().len()
406 }
407 }
408
409 pub fn is_empty(&self) -> bool {
413 self.len() == 0
414 }
415
416 pub fn version(&self) -> u64 {
418 self.version.load(Ordering::Acquire)
419 }
420
421 pub fn scan_by_priority(&self, max_priority: i64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
427 let entries = self.entries.read();
428
429 entries.iter()
430 .take_while(|(k, _)| k.priority <= max_priority)
431 .take(limit)
432 .map(|(k, v)| (k.clone(), v.clone()))
433 .collect()
434 }
435
436 pub fn scan_ready(&self, now: u64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
440 let entries = self.entries.read();
441
442 entries.iter()
443 .filter(|(k, _)| k.timestamp <= now)
444 .take(limit)
445 .map(|(k, v)| (k.clone(), v.clone()))
446 .collect()
447 }
448
449 pub fn config(&self) -> &QueueIndexConfig {
451 &self.config
452 }
453}
454
455pub struct QueueTableRegistry {
461 base: TableIndexRegistry,
463 queue_configs: RwLock<std::collections::HashMap<String, QueueIndexConfig>>,
465}
466
467impl QueueTableRegistry {
468 pub fn new() -> Self {
470 Self {
471 base: TableIndexRegistry::with_default_policy(IndexPolicy::Balanced),
472 queue_configs: RwLock::new(std::collections::HashMap::new()),
473 }
474 }
475
476 pub fn register_queue(&self, config: QueueIndexConfig) {
478 self.base.configure_table(config.base.clone());
480
481 self.queue_configs.write().insert(
483 config.base.table_name.clone(),
484 config,
485 );
486 }
487
488 pub fn is_queue(&self, table_name: &str) -> bool {
490 self.queue_configs.read().contains_key(table_name)
491 }
492
493 pub fn get_queue_config(&self, table_name: &str) -> Option<QueueIndexConfig> {
495 self.queue_configs.read().get(table_name).cloned()
496 }
497
498 pub fn base(&self) -> &TableIndexRegistry {
500 &self.base
501 }
502}
503
504impl Default for QueueTableRegistry {
505 fn default() -> Self {
506 Self::new()
507 }
508}
509
510#[derive(Debug, Clone, Default)]
516pub struct QueueIndexStats {
517 pub size: usize,
519 pub inserts: u64,
521 pub pops: u64,
523 pub peeks: u64,
525 pub cache_hit_rate: f64,
527}
528
529#[cfg(test)]
534mod tests {
535 use super::*;
536
537 #[test]
538 fn test_composite_key_ordering() {
539 let k1 = CompositeQueueKey::new(1, 100, 1, "task1");
540 let k2 = CompositeQueueKey::new(2, 100, 1, "task2");
541 let k3 = CompositeQueueKey::new(1, 200, 1, "task3");
542 let k4 = CompositeQueueKey::new(1, 100, 2, "task4");
543
544 assert!(k1 < k2);
546
547 assert!(k1 < k3);
549
550 assert!(k1 < k4);
552 }
553
554 #[test]
555 fn test_composite_key_encode_decode() {
556 let original = CompositeQueueKey::new(-100, 12345, 999, "my-task-id");
557 let encoded = original.encode();
558 let decoded = CompositeQueueKey::decode(&encoded).unwrap();
559
560 assert_eq!(decoded.priority, original.priority);
561 assert_eq!(decoded.timestamp, original.timestamp);
562 assert_eq!(decoded.sequence, original.sequence);
563 assert_eq!(decoded.task_id, original.task_id);
564 }
565
566 #[test]
567 fn test_queue_index_insert_pop() {
568 let config = QueueIndexConfig::new("test_queue");
569 let index: QueueIndex<String> = QueueIndex::new(config);
570
571 index.insert(CompositeQueueKey::new(3, 100, 1, "low"), "low priority".to_string());
573 index.insert(CompositeQueueKey::new(1, 100, 1, "high"), "high priority".to_string());
574 index.insert(CompositeQueueKey::new(2, 100, 1, "medium"), "medium priority".to_string());
575
576 assert_eq!(index.len(), 3);
577
578 let (key, value) = index.pop_min().unwrap();
580 assert_eq!(key.priority, 1);
581 assert_eq!(value, "high priority");
582
583 let (key, _) = index.pop_min().unwrap();
584 assert_eq!(key.priority, 2);
585
586 let (key, _) = index.pop_min().unwrap();
587 assert_eq!(key.priority, 3);
588
589 assert!(index.is_empty());
590 }
591
592 #[test]
593 fn test_queue_index_peek() {
594 let config = QueueIndexConfig::new("test_queue");
595 let index: QueueIndex<i32> = QueueIndex::new(config);
596
597 index.insert(CompositeQueueKey::new(2, 100, 1, "task1"), 1);
598 index.insert(CompositeQueueKey::new(1, 100, 1, "task2"), 2);
599
600 let (key, value) = index.peek_min().unwrap();
602 assert_eq!(key.priority, 1);
603 assert_eq!(value, 2);
604
605 assert_eq!(index.len(), 2);
607
608 let (key, _) = index.peek_min().unwrap();
610 assert_eq!(key.priority, 1);
611 }
612
613 #[test]
614 fn test_queue_index_remove() {
615 let config = QueueIndexConfig::new("test_queue");
616 let index: QueueIndex<i32> = QueueIndex::new(config);
617
618 let key1 = CompositeQueueKey::new(1, 100, 1, "task1");
619 let key2 = CompositeQueueKey::new(2, 100, 1, "task2");
620
621 index.insert(key1.clone(), 1);
622 index.insert(key2.clone(), 2);
623
624 let removed = index.remove(&key1);
626 assert_eq!(removed, Some(1));
627 assert_eq!(index.len(), 1);
628
629 let (key, _) = index.pop_min().unwrap();
631 assert_eq!(key.task_id, "task2");
632 }
633
634 #[test]
635 fn test_scan_by_priority() {
636 let config = QueueIndexConfig::new("test_queue");
637 let index: QueueIndex<i32> = QueueIndex::new(config);
638
639 for i in 1..=10 {
640 index.insert(CompositeQueueKey::new(i, 100, 1, format!("task{}", i)), i as i32);
641 }
642
643 let results = index.scan_by_priority(3, 100);
645 assert_eq!(results.len(), 3);
646 assert_eq!(results[0].0.priority, 1);
647 assert_eq!(results[1].0.priority, 2);
648 assert_eq!(results[2].0.priority, 3);
649 }
650
651 #[test]
652 fn test_scan_ready() {
653 let config = QueueIndexConfig::new("test_queue");
654 let index: QueueIndex<i32> = QueueIndex::new(config);
655
656 index.insert(CompositeQueueKey::new(1, 100, 1, "ready1"), 1);
658 index.insert(CompositeQueueKey::new(1, 200, 1, "ready2"), 2);
659 index.insert(CompositeQueueKey::new(1, 300, 1, "future"), 3);
660
661 let results = index.scan_ready(200, 100);
663 assert_eq!(results.len(), 2);
664 }
665
666 #[test]
667 fn test_queue_registry() {
668 let registry = QueueTableRegistry::new();
669
670 let queue_config = QueueIndexConfig::new("task_queue")
671 .with_priority_column("priority")
672 .with_timestamp_column("ready_at");
673
674 registry.register_queue(queue_config);
675
676 assert!(registry.is_queue("task_queue"));
677 assert!(!registry.is_queue("regular_table"));
678
679 let config = registry.get_queue_config("task_queue").unwrap();
680 assert_eq!(config.priority_column, Some("priority".to_string()));
681 }
682
683 #[test]
684 fn test_fifo_within_priority() {
685 let config = QueueIndexConfig::new("test_queue");
686 let index: QueueIndex<String> = QueueIndex::new(config);
687
688 index.insert(CompositeQueueKey::new(1, 100, 3, "third"), "third".to_string());
690 index.insert(CompositeQueueKey::new(1, 100, 1, "first"), "first".to_string());
691 index.insert(CompositeQueueKey::new(1, 100, 2, "second"), "second".to_string());
692
693 let (_, v1) = index.pop_min().unwrap();
695 let (_, v2) = index.pop_min().unwrap();
696 let (_, v3) = index.pop_min().unwrap();
697
698 assert_eq!(v1, "first");
699 assert_eq!(v2, "second");
700 assert_eq!(v3, "third");
701 }
702}