1use std::collections::VecDeque;
20
21use murk_core::command::{Command, Receipt};
22use murk_core::error::IngressError;
23use murk_core::id::TickId;
24
25#[derive(Debug)]
30pub struct DrainedCommand {
31 pub command: Command,
33 pub command_index: usize,
35}
36
37#[derive(Debug)]
39pub struct DrainResult {
40 pub commands: Vec<DrainedCommand>,
42 pub expired_receipts: Vec<Receipt>,
44}
45
46struct QueueEntry {
51 command: Command,
52 command_index: usize,
53}
54
55pub struct IngressQueue {
61 queue: VecDeque<QueueEntry>,
62 capacity: usize,
63 next_arrival_seq: u64,
64}
65
66impl IngressQueue {
67 pub fn new(capacity: usize) -> Self {
73 assert!(capacity > 0, "IngressQueue capacity must be at least 1");
74 Self {
75 queue: VecDeque::with_capacity(capacity),
76 capacity,
77 next_arrival_seq: 0,
78 }
79 }
80
81 pub fn submit(&mut self, commands: Vec<Command>, tick_disabled: bool) -> Vec<Receipt> {
92 let mut receipts = Vec::with_capacity(commands.len());
93
94 for (i, mut cmd) in commands.into_iter().enumerate() {
95 if tick_disabled {
96 receipts.push(Receipt {
97 accepted: false,
98 applied_tick_id: None,
99 reason_code: Some(IngressError::TickDisabled),
100 command_index: i,
101 });
102 continue;
103 }
104
105 if self.queue.len() >= self.capacity {
106 receipts.push(Receipt {
107 accepted: false,
108 applied_tick_id: None,
109 reason_code: Some(IngressError::QueueFull),
110 command_index: i,
111 });
112 continue;
113 }
114
115 cmd.arrival_seq = self.next_arrival_seq;
116 self.next_arrival_seq += 1;
117 self.queue.push_back(QueueEntry {
118 command: cmd,
119 command_index: i,
120 });
121
122 receipts.push(Receipt {
123 accepted: true,
124 applied_tick_id: None,
125 reason_code: None,
126 command_index: i,
127 });
128 }
129
130 receipts
131 }
132
133 pub fn drain(&mut self, current_tick: TickId) -> DrainResult {
142 let mut valid = Vec::new();
143 let mut expired_receipts = Vec::new();
144
145 for entry in self.queue.drain(..) {
146 if entry.command.expires_after_tick.0 < current_tick.0 {
147 expired_receipts.push(Receipt {
148 accepted: true,
149 applied_tick_id: None,
150 reason_code: Some(IngressError::Stale),
151 command_index: entry.command_index,
152 });
153 } else {
154 valid.push(DrainedCommand {
155 command: entry.command,
156 command_index: entry.command_index,
157 });
158 }
159 }
160
161 valid.sort_unstable_by_key(|dc| {
163 (
164 dc.command.priority_class,
165 dc.command.source_id.unwrap_or(u64::MAX),
166 dc.command.source_seq.unwrap_or(u64::MAX),
167 dc.command.arrival_seq,
168 )
169 });
170
171 DrainResult {
172 commands: valid,
173 expired_receipts,
174 }
175 }
176
177 pub fn len(&self) -> usize {
179 self.queue.len()
180 }
181
182 pub fn is_empty(&self) -> bool {
184 self.queue.is_empty()
185 }
186
187 pub fn capacity(&self) -> usize {
189 self.capacity
190 }
191
192 pub fn clear(&mut self) {
197 self.queue.clear();
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use murk_core::command::CommandPayload;
205 use murk_core::id::ParameterKey;
206
207 fn make_cmd(priority: u8, expires: u64) -> Command {
208 Command {
209 payload: CommandPayload::SetParameter {
210 key: ParameterKey(0),
211 value: 0.0,
212 },
213 expires_after_tick: TickId(expires),
214 source_id: None,
215 source_seq: None,
216 priority_class: priority,
217 arrival_seq: 0,
218 }
219 }
220
221 fn make_sourced_cmd(priority: u8, source_id: u64, source_seq: u64, expires: u64) -> Command {
222 Command {
223 payload: CommandPayload::SetParameter {
224 key: ParameterKey(0),
225 value: 0.0,
226 },
227 expires_after_tick: TickId(expires),
228 source_id: Some(source_id),
229 source_seq: Some(source_seq),
230 priority_class: priority,
231 arrival_seq: 0,
232 }
233 }
234
235 #[test]
238 fn submit_assigns_monotonic_arrival_seq() {
239 let mut q = IngressQueue::new(10);
240 let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
241 let receipts = q.submit(cmds, false);
242 assert_eq!(receipts.len(), 3);
243 assert!(receipts.iter().all(|r| r.accepted));
244
245 let result = q.drain(TickId(0));
247 assert_eq!(result.commands[0].command.arrival_seq, 0);
248 assert_eq!(result.commands[1].command.arrival_seq, 1);
249 assert_eq!(result.commands[2].command.arrival_seq, 2);
250 }
251
252 #[test]
253 fn submit_rejects_when_full() {
254 let mut q = IngressQueue::new(2);
255 let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
256 let receipts = q.submit(cmds, false);
257 assert!(receipts[0].accepted);
258 assert!(receipts[1].accepted);
259 assert!(!receipts[2].accepted);
260 assert_eq!(receipts[2].reason_code, Some(IngressError::QueueFull));
261 }
262
263 #[test]
264 fn submit_rejects_when_tick_disabled() {
265 let mut q = IngressQueue::new(10);
266 let cmds = vec![make_cmd(1, 100), make_cmd(1, 100)];
267 let receipts = q.submit(cmds, true);
268 assert!(!receipts[0].accepted);
269 assert_eq!(receipts[0].reason_code, Some(IngressError::TickDisabled));
270 assert!(!receipts[1].accepted);
271 assert_eq!(receipts[1].reason_code, Some(IngressError::TickDisabled));
272 assert!(q.is_empty());
273 }
274
275 #[test]
276 fn submit_partial_accept_on_overflow() {
277 let mut q = IngressQueue::new(3);
278 let cmds = vec![
279 make_cmd(1, 100),
280 make_cmd(1, 100),
281 make_cmd(1, 100),
282 make_cmd(1, 100),
283 make_cmd(1, 100),
284 ];
285 let receipts = q.submit(cmds, false);
286 assert_eq!(receipts.len(), 5);
287 assert!(receipts[0].accepted);
288 assert!(receipts[1].accepted);
289 assert!(receipts[2].accepted);
290 assert!(!receipts[3].accepted);
291 assert_eq!(receipts[3].reason_code, Some(IngressError::QueueFull));
292 assert!(!receipts[4].accepted);
293 assert_eq!(receipts[4].reason_code, Some(IngressError::QueueFull));
294 assert_eq!(q.len(), 3);
295 }
296
297 #[test]
298 fn arrival_seq_persists_across_submits() {
299 let mut q = IngressQueue::new(10);
300 q.submit(vec![make_cmd(1, 100), make_cmd(1, 100)], false);
301 q.submit(vec![make_cmd(1, 100)], false);
302 let result = q.drain(TickId(0));
303 assert_eq!(result.commands[0].command.arrival_seq, 0);
304 assert_eq!(result.commands[1].command.arrival_seq, 1);
305 assert_eq!(result.commands[2].command.arrival_seq, 2);
306 }
307
308 #[test]
309 fn receipt_command_index_matches_input() {
310 let mut q = IngressQueue::new(10);
311 let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
312 let receipts = q.submit(cmds, false);
313 assert_eq!(receipts[0].command_index, 0);
314 assert_eq!(receipts[1].command_index, 1);
315 assert_eq!(receipts[2].command_index, 2);
316 }
317
318 #[test]
321 fn drain_removes_expired_commands() {
322 let mut q = IngressQueue::new(10);
323 q.submit(vec![make_cmd(1, 3)], false);
325 let result = q.drain(TickId(4));
326 assert!(result.commands.is_empty());
327 assert_eq!(result.expired_receipts.len(), 1);
328 assert_eq!(
329 result.expired_receipts[0].reason_code,
330 Some(IngressError::Stale)
331 );
332 }
333
334 #[test]
335 fn drain_keeps_valid_commands() {
336 let mut q = IngressQueue::new(10);
337 q.submit(vec![make_cmd(1, 10)], false);
339 let result = q.drain(TickId(5));
340 assert_eq!(result.commands.len(), 1);
341 assert!(result.expired_receipts.is_empty());
342 }
343
344 #[test]
345 fn drain_boundary_expires_after_equals_current() {
346 let mut q = IngressQueue::new(10);
347 q.submit(vec![make_cmd(1, 5)], false);
349 let result = q.drain(TickId(5));
350 assert_eq!(result.commands.len(), 1);
351 assert!(result.expired_receipts.is_empty());
352 }
353
354 #[test]
355 fn drain_sorts_by_priority() {
356 let mut q = IngressQueue::new(10);
357 q.submit(
358 vec![make_cmd(2, 100), make_cmd(0, 100), make_cmd(1, 100)],
359 false,
360 );
361 let result = q.drain(TickId(0));
362 assert_eq!(result.commands[0].command.priority_class, 0);
363 assert_eq!(result.commands[1].command.priority_class, 1);
364 assert_eq!(result.commands[2].command.priority_class, 2);
365 }
366
367 #[test]
368 fn drain_sorts_by_source_within_priority() {
369 let mut q = IngressQueue::new(10);
370 q.submit(
371 vec![
372 make_sourced_cmd(1, 10, 2, 100),
373 make_sourced_cmd(1, 10, 1, 100),
374 make_sourced_cmd(1, 5, 0, 100),
375 ],
376 false,
377 );
378 let result = q.drain(TickId(0));
379 assert_eq!(result.commands[0].command.source_id, Some(5));
381 assert_eq!(result.commands[0].command.source_seq, Some(0));
382 assert_eq!(result.commands[1].command.source_id, Some(10));
384 assert_eq!(result.commands[1].command.source_seq, Some(1));
385 assert_eq!(result.commands[2].command.source_id, Some(10));
386 assert_eq!(result.commands[2].command.source_seq, Some(2));
387 }
388
389 #[test]
390 fn drain_sorts_by_arrival_seq_when_no_source() {
391 let mut q = IngressQueue::new(10);
392 q.submit(
394 vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)],
395 false,
396 );
397 let result = q.drain(TickId(0));
398 assert_eq!(result.commands[0].command.arrival_seq, 0);
399 assert_eq!(result.commands[1].command.arrival_seq, 1);
400 assert_eq!(result.commands[2].command.arrival_seq, 2);
401 }
402
403 #[test]
404 fn drain_mixed_source_and_no_source() {
405 let mut q = IngressQueue::new(10);
406 q.submit(
407 vec![
408 make_cmd(1, 100), make_sourced_cmd(1, 5, 0, 100), ],
411 false,
412 );
413 let result = q.drain(TickId(0));
414 assert_eq!(result.commands[0].command.source_id, Some(5));
416 assert_eq!(result.commands[1].command.source_id, None);
417 }
418
419 #[test]
420 fn drain_empty_queue() {
421 let mut q = IngressQueue::new(10);
422 let result = q.drain(TickId(0));
423 assert!(result.commands.is_empty());
424 assert!(result.expired_receipts.is_empty());
425 }
426
427 #[test]
428 fn drain_all_expired() {
429 let mut q = IngressQueue::new(10);
430 q.submit(vec![make_cmd(1, 0), make_cmd(1, 1), make_cmd(1, 2)], false);
431 let result = q.drain(TickId(10));
432 assert!(result.commands.is_empty());
433 assert_eq!(result.expired_receipts.len(), 3);
434 }
435
436 #[test]
437 fn drain_expired_receipts_preserve_command_index() {
438 let mut q = IngressQueue::new(10);
439 q.submit(
441 vec![
442 make_cmd(1, 100), make_cmd(1, 2), make_cmd(1, 1), make_cmd(1, 100), ],
447 false,
448 );
449 let result = q.drain(TickId(3));
450 assert_eq!(result.commands.len(), 2);
451 assert_eq!(result.expired_receipts.len(), 2);
452 assert_eq!(result.expired_receipts[0].command_index, 1);
454 assert_eq!(result.expired_receipts[1].command_index, 2);
455 }
456
457 #[test]
458 fn drain_expired_receipts_across_batches() {
459 let mut q = IngressQueue::new(10);
460 q.submit(vec![make_cmd(1, 0), make_cmd(1, 100)], false);
462 q.submit(vec![make_cmd(1, 100), make_cmd(1, 0)], false);
464 let result = q.drain(TickId(5));
465 assert_eq!(result.commands.len(), 2);
466 assert_eq!(result.expired_receipts.len(), 2);
467 assert_eq!(result.expired_receipts[0].command_index, 0);
469 assert_eq!(result.expired_receipts[1].command_index, 1);
471 }
472
473 #[test]
474 fn drain_clears_queue() {
475 let mut q = IngressQueue::new(10);
476 q.submit(vec![make_cmd(1, 100), make_cmd(1, 100)], false);
477 assert_eq!(q.len(), 2);
478 let _ = q.drain(TickId(0));
479 assert!(q.is_empty());
480 }
481
482 mod proptests {
485 use super::*;
486 use proptest::prelude::*;
487
488 fn arb_command() -> impl Strategy<Value = Command> {
489 (
490 0u8..4,
491 any::<u64>(),
492 prop::option::of(0u64..100),
493 prop::option::of(0u64..100),
494 )
495 .prop_map(|(prio, expires, src_id, src_seq)| Command {
496 payload: CommandPayload::SetParameter {
497 key: ParameterKey(0),
498 value: 0.0,
499 },
500 expires_after_tick: TickId(expires),
501 source_id: src_id,
502 source_seq: src_seq,
503 priority_class: prio,
504 arrival_seq: 0,
505 })
506 }
507
508 proptest! {
509 #[test]
510 fn drain_always_sorted(commands in prop::collection::vec(arb_command(), 0..64)) {
511 let mut q = IngressQueue::new(128);
512 q.submit(commands, false);
513 let result = q.drain(TickId(0));
514
515 for window in result.commands.windows(2) {
517 let a = &window[0].command;
518 let b = &window[1].command;
519 let key_a = (
520 a.priority_class,
521 a.source_id.unwrap_or(u64::MAX),
522 a.source_seq.unwrap_or(u64::MAX),
523 a.arrival_seq,
524 );
525 let key_b = (
526 b.priority_class,
527 b.source_id.unwrap_or(u64::MAX),
528 b.source_seq.unwrap_or(u64::MAX),
529 b.arrival_seq,
530 );
531 prop_assert!(key_a <= key_b, "sort violated: {key_a:?} > {key_b:?}");
532 }
533 }
534 }
535 }
536}