orbok_workers/scheduler/queue.rs
1//! Bounded work queues with backpressure (RFC-036 §7, §10).
2//!
3//! Each `BoundedQueue<T>` holds at most `capacity` items. Callers must
4//! check `is_full` before pushing; the scheduler applies upstream
5//! backpressure when any downstream queue reports full.
6//!
7//! This implementation is synchronous and single-threaded, matching
8//! the existing orbok-workers execution model. Async channels can be
9//! layered in a future RFC without changing this API.
10
11use crate::scheduler::job::{IndexJob, JobKind, QueueKind};
12use std::collections::VecDeque;
13
14/// A single bounded queue of `IndexJob`s, ordered by priority then
15/// insertion order (RFC-036 §8).
16pub struct BoundedQueue {
17 kind: QueueKind,
18 capacity: usize,
19 items: VecDeque<IndexJob>,
20 /// Total jobs ever pushed (for progress reporting).
21 total_pushed: u64,
22 /// Whether backpressure is currently active for this queue.
23 pub backpressure_active: bool,
24}
25
26impl BoundedQueue {
27 pub fn new(kind: QueueKind, capacity: usize) -> Self {
28 Self {
29 kind,
30 capacity,
31 items: VecDeque::new(),
32 total_pushed: 0,
33 backpressure_active: false,
34 }
35 }
36
37 pub fn kind(&self) -> QueueKind {
38 self.kind
39 }
40
41 pub fn len(&self) -> usize {
42 self.items.len()
43 }
44
45 pub fn is_empty(&self) -> bool {
46 self.items.is_empty()
47 }
48
49 /// `true` when the queue has reached its capacity ceiling.
50 /// Callers must not push when this returns `true`.
51 pub fn is_full(&self) -> bool {
52 self.items.len() >= self.capacity
53 }
54
55 /// Remaining capacity.
56 pub fn remaining(&self) -> usize {
57 self.capacity.saturating_sub(self.items.len())
58 }
59
60 pub fn capacity(&self) -> usize {
61 self.capacity
62 }
63
64 pub fn total_pushed(&self) -> u64 {
65 self.total_pushed
66 }
67
68 /// Push a job into the queue, maintaining priority order.
69 ///
70 /// ## Ordering invariant
71 ///
72 /// `pop_back` serves the item at the **highest index**. To dispatch
73 /// the highest-priority job first, and within equal priority the
74 /// *oldest* job first (FIFO), items must be stored with:
75 ///
76 /// - higher-priority items at higher indices (closer to back), and
77 /// - among equal-priority items, older items at higher indices.
78 ///
79 /// ## Insertion rule
80 ///
81 /// The new job is inserted at the first index (from the back) where
82 /// the existing item has strictly lower priority than the new job.
83 /// This places it after (higher index than) all equal-or-higher
84 /// priority items already in the queue, satisfying both conditions.
85 ///
86 /// Panics if the queue is full — callers must check `is_full` first.
87 pub fn push(&mut self, job: IndexJob) {
88 assert!(
89 !self.is_full(),
90 "BoundedQueue::push called on a full queue ({:?})",
91 self.kind
92 );
93 // Find insertion point: scan from back to front, find the first
94 // item with strictly lower priority. Insert just after it (i.e.
95 // at index i+1, which is one step toward the back from i).
96 // If all existing items have >= priority (or queue is empty),
97 // insert at the front (index 0) so the new job is served last
98 // among equal-priority items.
99 let len = self.items.len();
100 let pos = if len == 0 {
101 0
102 } else {
103 // Start from the back (highest priority end).
104 let mut insert_at = 0; // default: insert at front
105 for i in (0..len).rev() {
106 if self.items[i].priority < job.priority {
107 // items[i] has equal or lower priority than the new
108 // job. The new job goes just after it (toward back).
109 insert_at = i + 1;
110 break;
111 }
112 // items[i].priority > job.priority: new job must go
113 // further toward the front; continue scanning.
114 }
115 insert_at
116 };
117 self.items.insert(pos, job);
118 self.total_pushed += 1;
119 }
120
121 /// Pop the highest-priority job (front of the queue).
122 pub fn pop(&mut self) -> Option<IndexJob> {
123 self.items.pop_back()
124 }
125
126 /// Peek at the highest-priority job without removing it.
127 pub fn peek(&self) -> Option<&IndexJob> {
128 self.items.back()
129 }
130
131 /// Cancel all jobs belonging to `source_id` (RFC-036 §12.3).
132 /// Returns the number of jobs removed.
133 pub fn cancel_for_source(&mut self, source_id: &orbok_core::SourceId) -> usize {
134 let before = self.items.len();
135 self.items.retain(|j| &j.source_id != source_id);
136 before - self.items.len()
137 }
138
139 /// Drain all jobs (e.g. on full scheduler reset).
140 pub fn clear(&mut self) -> usize {
141 let n = self.items.len();
142 self.items.clear();
143 n
144 }
145}
146
147// ── Multi-queue set ───────────────────────────────────────────────────────
148
149/// The complete set of bounded queues for the scheduler (RFC-036 §7).
150pub struct QueueSet {
151 pub scan: BoundedQueue,
152 pub extract: BoundedQueue,
153 pub chunk: BoundedQueue,
154 pub keyword: BoundedQueue,
155 pub embedding: BoundedQueue,
156 pub maintenance: BoundedQueue,
157}
158
159impl QueueSet {
160 pub fn new(capacity: &crate::scheduler::limits::QueueCapacity) -> Self {
161 Self {
162 scan: BoundedQueue::new(QueueKind::Scan, capacity.scan_queue_max),
163 extract: BoundedQueue::new(QueueKind::Extract, capacity.extract_queue_max),
164 chunk: BoundedQueue::new(QueueKind::Chunk, capacity.chunk_queue_max),
165 keyword: BoundedQueue::new(QueueKind::Keyword, capacity.keyword_queue_max),
166 embedding: BoundedQueue::new(QueueKind::Embedding, capacity.embedding_queue_max),
167 maintenance: BoundedQueue::new(QueueKind::Maintenance, capacity.maintenance_queue_max),
168 }
169 }
170
171 /// Route a job to its natural queue by kind (RFC-036 §6).
172 pub fn queue_for(&mut self, kind: JobKind) -> &mut BoundedQueue {
173 match kind {
174 JobKind::ScanSource => &mut self.scan,
175 JobKind::ExtractFile => &mut self.extract,
176 JobKind::ChunkFile => &mut self.chunk,
177 JobKind::UpdateKeywordIndex => &mut self.keyword,
178 JobKind::GenerateEmbedding => &mut self.embedding,
179 JobKind::Cleanup | JobKind::Repair => &mut self.maintenance,
180 }
181 }
182
183 /// Total pending jobs across all queues.
184 pub fn total_pending(&self) -> usize {
185 self.scan.len()
186 + self.extract.len()
187 + self.chunk.len()
188 + self.keyword.len()
189 + self.embedding.len()
190 + self.maintenance.len()
191 }
192
193 /// Cancel all queued jobs for a source (RFC-036 §12.3).
194 pub fn cancel_source(&mut self, source_id: &orbok_core::SourceId) -> usize {
195 self.scan.cancel_for_source(source_id)
196 + self.extract.cancel_for_source(source_id)
197 + self.chunk.cancel_for_source(source_id)
198 + self.keyword.cancel_for_source(source_id)
199 + self.embedding.cancel_for_source(source_id)
200 + self.maintenance.cancel_for_source(source_id)
201 }
202
203 /// Pop the next job to run, respecting resource mode (RFC-036 §8, §13).
204 ///
205 /// In `UserActive` mode, embedding is skipped so search is never
206 /// delayed (RFC-036 §9.2 embedding rule).
207 pub fn pop_next(&mut self, resource_mode: super::job::ResourceMode) -> Option<IndexJob> {
208 use super::job::ResourceMode;
209
210 // Priority order: scan → extract → chunk → keyword → embedding →
211 // maintenance. Embedding is skipped entirely in UserActive mode.
212 let queues: &mut [&mut BoundedQueue] = &mut [
213 &mut self.scan,
214 &mut self.extract,
215 &mut self.chunk,
216 &mut self.keyword,
217 &mut self.embedding,
218 &mut self.maintenance,
219 ];
220
221 for q in queues.iter_mut() {
222 if q.kind() == QueueKind::Embedding && resource_mode == ResourceMode::UserActive {
223 continue; // RFC-036 §9.2: yield embedding to active search.
224 }
225 if let Some(job) = q.pop() {
226 return Some(job);
227 }
228 }
229 None
230 }
231}