use crate::scheduler::job::{IndexJob, JobKind, QueueKind};
use std::collections::VecDeque;
pub struct BoundedQueue {
kind: QueueKind,
capacity: usize,
items: VecDeque<IndexJob>,
total_pushed: u64,
pub backpressure_active: bool,
}
impl BoundedQueue {
pub fn new(kind: QueueKind, capacity: usize) -> Self {
Self {
kind,
capacity,
items: VecDeque::new(),
total_pushed: 0,
backpressure_active: false,
}
}
pub fn kind(&self) -> QueueKind {
self.kind
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
pub fn is_full(&self) -> bool {
self.items.len() >= self.capacity
}
pub fn remaining(&self) -> usize {
self.capacity.saturating_sub(self.items.len())
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn total_pushed(&self) -> u64 {
self.total_pushed
}
pub fn push(&mut self, job: IndexJob) {
assert!(
!self.is_full(),
"BoundedQueue::push called on a full queue ({:?})",
self.kind
);
let len = self.items.len();
let pos = if len == 0 {
0
} else {
let mut insert_at = 0; for i in (0..len).rev() {
if self.items[i].priority < job.priority {
insert_at = i + 1;
break;
}
}
insert_at
};
self.items.insert(pos, job);
self.total_pushed += 1;
}
pub fn pop(&mut self) -> Option<IndexJob> {
self.items.pop_back()
}
pub fn peek(&self) -> Option<&IndexJob> {
self.items.back()
}
pub fn cancel_for_source(&mut self, source_id: &orbok_core::SourceId) -> usize {
let before = self.items.len();
self.items.retain(|j| &j.source_id != source_id);
before - self.items.len()
}
pub fn clear(&mut self) -> usize {
let n = self.items.len();
self.items.clear();
n
}
}
pub struct QueueSet {
pub scan: BoundedQueue,
pub extract: BoundedQueue,
pub chunk: BoundedQueue,
pub keyword: BoundedQueue,
pub embedding: BoundedQueue,
pub maintenance: BoundedQueue,
}
impl QueueSet {
pub fn new(capacity: &crate::scheduler::limits::QueueCapacity) -> Self {
Self {
scan: BoundedQueue::new(QueueKind::Scan, capacity.scan_queue_max),
extract: BoundedQueue::new(QueueKind::Extract, capacity.extract_queue_max),
chunk: BoundedQueue::new(QueueKind::Chunk, capacity.chunk_queue_max),
keyword: BoundedQueue::new(QueueKind::Keyword, capacity.keyword_queue_max),
embedding: BoundedQueue::new(QueueKind::Embedding, capacity.embedding_queue_max),
maintenance: BoundedQueue::new(QueueKind::Maintenance, capacity.maintenance_queue_max),
}
}
pub fn queue_for(&mut self, kind: JobKind) -> &mut BoundedQueue {
match kind {
JobKind::ScanSource => &mut self.scan,
JobKind::ExtractFile => &mut self.extract,
JobKind::ChunkFile => &mut self.chunk,
JobKind::UpdateKeywordIndex => &mut self.keyword,
JobKind::GenerateEmbedding => &mut self.embedding,
JobKind::Cleanup | JobKind::Repair => &mut self.maintenance,
}
}
pub fn total_pending(&self) -> usize {
self.scan.len()
+ self.extract.len()
+ self.chunk.len()
+ self.keyword.len()
+ self.embedding.len()
+ self.maintenance.len()
}
pub fn cancel_source(&mut self, source_id: &orbok_core::SourceId) -> usize {
self.scan.cancel_for_source(source_id)
+ self.extract.cancel_for_source(source_id)
+ self.chunk.cancel_for_source(source_id)
+ self.keyword.cancel_for_source(source_id)
+ self.embedding.cancel_for_source(source_id)
+ self.maintenance.cancel_for_source(source_id)
}
pub fn pop_next(&mut self, resource_mode: super::job::ResourceMode) -> Option<IndexJob> {
use super::job::ResourceMode;
let queues: &mut [&mut BoundedQueue] = &mut [
&mut self.scan,
&mut self.extract,
&mut self.chunk,
&mut self.keyword,
&mut self.embedding,
&mut self.maintenance,
];
for q in queues.iter_mut() {
if q.kind() == QueueKind::Embedding && resource_mode == ResourceMode::UserActive {
continue; }
if let Some(job) = q.pop() {
return Some(job);
}
}
None
}
}