Skip to main content

orbok_workers/scheduler/
queue.rs

1//! Bounded work queues with backpressure (RFC-036 §7, §10).
2//!
3//! Each `BoundedQueue<T>` holds at most `capacity` items. Callers must
4//! check `is_full` before pushing; the scheduler applies upstream
5//! backpressure when any downstream queue reports full.
6//!
7//! This implementation is synchronous and single-threaded, matching
8//! the existing orbok-workers execution model. Async channels can be
9//! layered in a future RFC without changing this API.
10
11use crate::scheduler::job::{IndexJob, JobKind, QueueKind};
12use std::collections::VecDeque;
13
14/// A single bounded queue of `IndexJob`s, ordered by priority then
15/// insertion order (RFC-036 §8).
16pub struct BoundedQueue {
17    kind: QueueKind,
18    capacity: usize,
19    items: VecDeque<IndexJob>,
20    /// Total jobs ever pushed (for progress reporting).
21    total_pushed: u64,
22    /// Whether backpressure is currently active for this queue.
23    pub backpressure_active: bool,
24}
25
26impl BoundedQueue {
27    pub fn new(kind: QueueKind, capacity: usize) -> Self {
28        Self {
29            kind,
30            capacity,
31            items: VecDeque::new(),
32            total_pushed: 0,
33            backpressure_active: false,
34        }
35    }
36
37    pub fn kind(&self) -> QueueKind {
38        self.kind
39    }
40
41    pub fn len(&self) -> usize {
42        self.items.len()
43    }
44
45    pub fn is_empty(&self) -> bool {
46        self.items.is_empty()
47    }
48
49    /// `true` when the queue has reached its capacity ceiling.
50    /// Callers must not push when this returns `true`.
51    pub fn is_full(&self) -> bool {
52        self.items.len() >= self.capacity
53    }
54
55    /// Remaining capacity.
56    pub fn remaining(&self) -> usize {
57        self.capacity.saturating_sub(self.items.len())
58    }
59
60    pub fn capacity(&self) -> usize {
61        self.capacity
62    }
63
64    pub fn total_pushed(&self) -> u64 {
65        self.total_pushed
66    }
67
68    /// Push a job into the queue, maintaining priority order.
69    ///
70    /// ## Ordering invariant
71    ///
72    /// `pop_back` serves the item at the **highest index**. To dispatch
73    /// the highest-priority job first, and within equal priority the
74    /// *oldest* job first (FIFO), items must be stored with:
75    ///
76    /// - higher-priority items at higher indices (closer to back), and
77    /// - among equal-priority items, older items at higher indices.
78    ///
79    /// ## Insertion rule
80    ///
81    /// The new job is inserted at the first index (from the back) where
82    /// the existing item has strictly lower priority than the new job.
83    /// This places it after (higher index than) all equal-or-higher
84    /// priority items already in the queue, satisfying both conditions.
85    ///
86    /// Panics if the queue is full — callers must check `is_full` first.
87    pub fn push(&mut self, job: IndexJob) {
88        assert!(
89            !self.is_full(),
90            "BoundedQueue::push called on a full queue ({:?})",
91            self.kind
92        );
93        // Find insertion point: scan from back to front, find the first
94        // item with strictly lower priority. Insert just after it (i.e.
95        // at index i+1, which is one step toward the back from i).
96        // If all existing items have >= priority (or queue is empty),
97        // insert at the front (index 0) so the new job is served last
98        // among equal-priority items.
99        let len = self.items.len();
100        let pos = if len == 0 {
101            0
102        } else {
103            // Start from the back (highest priority end).
104            let mut insert_at = 0; // default: insert at front
105            for i in (0..len).rev() {
106                if self.items[i].priority < job.priority {
107                    // items[i] has equal or lower priority than the new
108                    // job. The new job goes just after it (toward back).
109                    insert_at = i + 1;
110                    break;
111                }
112                // items[i].priority > job.priority: new job must go
113                // further toward the front; continue scanning.
114            }
115            insert_at
116        };
117        self.items.insert(pos, job);
118        self.total_pushed += 1;
119    }
120
121    /// Pop the highest-priority job (front of the queue).
122    pub fn pop(&mut self) -> Option<IndexJob> {
123        self.items.pop_back()
124    }
125
126    /// Peek at the highest-priority job without removing it.
127    pub fn peek(&self) -> Option<&IndexJob> {
128        self.items.back()
129    }
130
131    /// Cancel all jobs belonging to `source_id` (RFC-036 §12.3).
132    /// Returns the number of jobs removed.
133    pub fn cancel_for_source(&mut self, source_id: &orbok_core::SourceId) -> usize {
134        let before = self.items.len();
135        self.items.retain(|j| &j.source_id != source_id);
136        before - self.items.len()
137    }
138
139    /// Drain all jobs (e.g. on full scheduler reset).
140    pub fn clear(&mut self) -> usize {
141        let n = self.items.len();
142        self.items.clear();
143        n
144    }
145}
146
147// ── Multi-queue set ───────────────────────────────────────────────────────
148
149/// The complete set of bounded queues for the scheduler (RFC-036 §7).
150pub struct QueueSet {
151    pub scan: BoundedQueue,
152    pub extract: BoundedQueue,
153    pub chunk: BoundedQueue,
154    pub keyword: BoundedQueue,
155    pub embedding: BoundedQueue,
156    pub maintenance: BoundedQueue,
157}
158
159impl QueueSet {
160    pub fn new(capacity: &crate::scheduler::limits::QueueCapacity) -> Self {
161        Self {
162            scan: BoundedQueue::new(QueueKind::Scan, capacity.scan_queue_max),
163            extract: BoundedQueue::new(QueueKind::Extract, capacity.extract_queue_max),
164            chunk: BoundedQueue::new(QueueKind::Chunk, capacity.chunk_queue_max),
165            keyword: BoundedQueue::new(QueueKind::Keyword, capacity.keyword_queue_max),
166            embedding: BoundedQueue::new(QueueKind::Embedding, capacity.embedding_queue_max),
167            maintenance: BoundedQueue::new(QueueKind::Maintenance, capacity.maintenance_queue_max),
168        }
169    }
170
171    /// Route a job to its natural queue by kind (RFC-036 §6).
172    pub fn queue_for(&mut self, kind: JobKind) -> &mut BoundedQueue {
173        match kind {
174            JobKind::ScanSource => &mut self.scan,
175            JobKind::ExtractFile => &mut self.extract,
176            JobKind::ChunkFile => &mut self.chunk,
177            JobKind::UpdateKeywordIndex => &mut self.keyword,
178            JobKind::GenerateEmbedding => &mut self.embedding,
179            JobKind::Cleanup | JobKind::Repair => &mut self.maintenance,
180        }
181    }
182
183    /// Total pending jobs across all queues.
184    pub fn total_pending(&self) -> usize {
185        self.scan.len()
186            + self.extract.len()
187            + self.chunk.len()
188            + self.keyword.len()
189            + self.embedding.len()
190            + self.maintenance.len()
191    }
192
193    /// Cancel all queued jobs for a source (RFC-036 §12.3).
194    pub fn cancel_source(&mut self, source_id: &orbok_core::SourceId) -> usize {
195        self.scan.cancel_for_source(source_id)
196            + self.extract.cancel_for_source(source_id)
197            + self.chunk.cancel_for_source(source_id)
198            + self.keyword.cancel_for_source(source_id)
199            + self.embedding.cancel_for_source(source_id)
200            + self.maintenance.cancel_for_source(source_id)
201    }
202
203    /// Pop the next job to run, respecting resource mode (RFC-036 §8, §13).
204    ///
205    /// In `UserActive` mode, embedding is skipped so search is never
206    /// delayed (RFC-036 §9.2 embedding rule).
207    pub fn pop_next(&mut self, resource_mode: super::job::ResourceMode) -> Option<IndexJob> {
208        use super::job::ResourceMode;
209
210        // Priority order: scan → extract → chunk → keyword → embedding →
211        // maintenance. Embedding is skipped entirely in UserActive mode.
212        let queues: &mut [&mut BoundedQueue] = &mut [
213            &mut self.scan,
214            &mut self.extract,
215            &mut self.chunk,
216            &mut self.keyword,
217            &mut self.embedding,
218            &mut self.maintenance,
219        ];
220
221        for q in queues.iter_mut() {
222            if q.kind() == QueueKind::Embedding && resource_mode == ResourceMode::UserActive {
223                continue; // RFC-036 §9.2: yield embedding to active search.
224            }
225            if let Some(job) = q.pop() {
226                return Some(job);
227            }
228        }
229        None
230    }
231}