1use std::collections::VecDeque;
20
21use murk_core::command::{Command, Receipt};
22use murk_core::error::IngressError;
23use murk_core::id::TickId;
24
25#[derive(Debug)]
30pub struct DrainedCommand {
31 pub command: Command,
33 pub command_index: usize,
35}
36
37#[derive(Debug)]
39pub struct DrainResult {
40 pub commands: Vec<DrainedCommand>,
42 pub expired_receipts: Vec<Receipt>,
44}
45
46struct QueueEntry {
51 command: Command,
52 command_index: usize,
53}
54
55pub struct IngressQueue {
61 queue: VecDeque<QueueEntry>,
62 capacity: usize,
63 next_arrival_seq: u64,
64}
65
66impl IngressQueue {
67 pub fn new(capacity: usize) -> Self {
73 assert!(capacity > 0, "IngressQueue capacity must be at least 1");
74 Self {
75 queue: VecDeque::with_capacity(capacity),
76 capacity,
77 next_arrival_seq: 0,
78 }
79 }
80
81 pub fn submit(&mut self, commands: Vec<Command>, tick_disabled: bool) -> Vec<Receipt> {
92 let mut receipts = Vec::with_capacity(commands.len());
93
94 for (i, mut cmd) in commands.into_iter().enumerate() {
95 if tick_disabled {
96 receipts.push(Receipt {
97 accepted: false,
98 applied_tick_id: None,
99 reason_code: Some(IngressError::TickDisabled),
100 command_index: i,
101 });
102 continue;
103 }
104
105 if self.queue.len() >= self.capacity {
106 receipts.push(Receipt {
107 accepted: false,
108 applied_tick_id: None,
109 reason_code: Some(IngressError::QueueFull),
110 command_index: i,
111 });
112 continue;
113 }
114
115 if cmd.source_id.is_none() {
117 cmd.source_seq = None;
118 }
119 cmd.arrival_seq = self.next_arrival_seq;
120 self.next_arrival_seq += 1;
121 self.queue.push_back(QueueEntry {
122 command: cmd,
123 command_index: i,
124 });
125
126 receipts.push(Receipt {
127 accepted: true,
128 applied_tick_id: None,
129 reason_code: None,
130 command_index: i,
131 });
132 }
133
134 receipts
135 }
136
137 pub fn drain(&mut self, current_tick: TickId) -> DrainResult {
146 let mut valid = Vec::new();
147 let mut expired_receipts = Vec::new();
148
149 for entry in self.queue.drain(..) {
150 if entry.command.expires_after_tick.0 < current_tick.0 {
151 expired_receipts.push(Receipt {
152 accepted: true,
153 applied_tick_id: None,
154 reason_code: Some(IngressError::Stale),
155 command_index: entry.command_index,
156 });
157 } else {
158 valid.push(DrainedCommand {
159 command: entry.command,
160 command_index: entry.command_index,
161 });
162 }
163 }
164
165 valid.sort_unstable_by_key(|dc| {
167 let c = &dc.command;
168 (
169 c.priority_class,
170 c.source_id.unwrap_or(u64::MAX),
171 if c.source_id.is_some() {
173 c.source_seq.unwrap_or(u64::MAX)
174 } else {
175 u64::MAX
176 },
177 c.arrival_seq,
178 )
179 });
180
181 DrainResult {
182 commands: valid,
183 expired_receipts,
184 }
185 }
186
187 pub fn len(&self) -> usize {
189 self.queue.len()
190 }
191
192 pub fn is_empty(&self) -> bool {
194 self.queue.is_empty()
195 }
196
197 pub fn capacity(&self) -> usize {
199 self.capacity
200 }
201
202 pub fn clear(&mut self) {
207 self.queue.clear();
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use murk_core::command::CommandPayload;
215 use murk_core::id::ParameterKey;
216
217 fn make_cmd(priority: u8, expires: u64) -> Command {
218 Command {
219 payload: CommandPayload::SetParameter {
220 key: ParameterKey(0),
221 value: 0.0,
222 },
223 expires_after_tick: TickId(expires),
224 source_id: None,
225 source_seq: None,
226 priority_class: priority,
227 arrival_seq: 0,
228 }
229 }
230
231 fn make_sourced_cmd(priority: u8, source_id: u64, source_seq: u64, expires: u64) -> Command {
232 Command {
233 payload: CommandPayload::SetParameter {
234 key: ParameterKey(0),
235 value: 0.0,
236 },
237 expires_after_tick: TickId(expires),
238 source_id: Some(source_id),
239 source_seq: Some(source_seq),
240 priority_class: priority,
241 arrival_seq: 0,
242 }
243 }
244
245 #[test]
248 fn submit_assigns_monotonic_arrival_seq() {
249 let mut q = IngressQueue::new(10);
250 let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
251 let receipts = q.submit(cmds, false);
252 assert_eq!(receipts.len(), 3);
253 assert!(receipts.iter().all(|r| r.accepted));
254
255 let result = q.drain(TickId(0));
257 assert_eq!(result.commands[0].command.arrival_seq, 0);
258 assert_eq!(result.commands[1].command.arrival_seq, 1);
259 assert_eq!(result.commands[2].command.arrival_seq, 2);
260 }
261
262 #[test]
263 fn submit_rejects_when_full() {
264 let mut q = IngressQueue::new(2);
265 let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
266 let receipts = q.submit(cmds, false);
267 assert!(receipts[0].accepted);
268 assert!(receipts[1].accepted);
269 assert!(!receipts[2].accepted);
270 assert_eq!(receipts[2].reason_code, Some(IngressError::QueueFull));
271 }
272
273 #[test]
274 fn submit_rejects_when_tick_disabled() {
275 let mut q = IngressQueue::new(10);
276 let cmds = vec![make_cmd(1, 100), make_cmd(1, 100)];
277 let receipts = q.submit(cmds, true);
278 assert!(!receipts[0].accepted);
279 assert_eq!(receipts[0].reason_code, Some(IngressError::TickDisabled));
280 assert!(!receipts[1].accepted);
281 assert_eq!(receipts[1].reason_code, Some(IngressError::TickDisabled));
282 assert!(q.is_empty());
283 }
284
285 #[test]
286 fn submit_partial_accept_on_overflow() {
287 let mut q = IngressQueue::new(3);
288 let cmds = vec![
289 make_cmd(1, 100),
290 make_cmd(1, 100),
291 make_cmd(1, 100),
292 make_cmd(1, 100),
293 make_cmd(1, 100),
294 ];
295 let receipts = q.submit(cmds, false);
296 assert_eq!(receipts.len(), 5);
297 assert!(receipts[0].accepted);
298 assert!(receipts[1].accepted);
299 assert!(receipts[2].accepted);
300 assert!(!receipts[3].accepted);
301 assert_eq!(receipts[3].reason_code, Some(IngressError::QueueFull));
302 assert!(!receipts[4].accepted);
303 assert_eq!(receipts[4].reason_code, Some(IngressError::QueueFull));
304 assert_eq!(q.len(), 3);
305 }
306
307 #[test]
308 fn arrival_seq_persists_across_submits() {
309 let mut q = IngressQueue::new(10);
310 q.submit(vec![make_cmd(1, 100), make_cmd(1, 100)], false);
311 q.submit(vec![make_cmd(1, 100)], false);
312 let result = q.drain(TickId(0));
313 assert_eq!(result.commands[0].command.arrival_seq, 0);
314 assert_eq!(result.commands[1].command.arrival_seq, 1);
315 assert_eq!(result.commands[2].command.arrival_seq, 2);
316 }
317
318 #[test]
319 fn receipt_command_index_matches_input() {
320 let mut q = IngressQueue::new(10);
321 let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
322 let receipts = q.submit(cmds, false);
323 assert_eq!(receipts[0].command_index, 0);
324 assert_eq!(receipts[1].command_index, 1);
325 assert_eq!(receipts[2].command_index, 2);
326 }
327
328 #[test]
331 fn drain_removes_expired_commands() {
332 let mut q = IngressQueue::new(10);
333 q.submit(vec![make_cmd(1, 3)], false);
335 let result = q.drain(TickId(4));
336 assert!(result.commands.is_empty());
337 assert_eq!(result.expired_receipts.len(), 1);
338 assert_eq!(
339 result.expired_receipts[0].reason_code,
340 Some(IngressError::Stale)
341 );
342 }
343
344 #[test]
345 fn drain_keeps_valid_commands() {
346 let mut q = IngressQueue::new(10);
347 q.submit(vec![make_cmd(1, 10)], false);
349 let result = q.drain(TickId(5));
350 assert_eq!(result.commands.len(), 1);
351 assert!(result.expired_receipts.is_empty());
352 }
353
354 #[test]
355 fn drain_boundary_expires_after_equals_current() {
356 let mut q = IngressQueue::new(10);
357 q.submit(vec![make_cmd(1, 5)], false);
359 let result = q.drain(TickId(5));
360 assert_eq!(result.commands.len(), 1);
361 assert!(result.expired_receipts.is_empty());
362 }
363
364 #[test]
365 fn drain_sorts_by_priority() {
366 let mut q = IngressQueue::new(10);
367 q.submit(
368 vec![make_cmd(2, 100), make_cmd(0, 100), make_cmd(1, 100)],
369 false,
370 );
371 let result = q.drain(TickId(0));
372 assert_eq!(result.commands[0].command.priority_class, 0);
373 assert_eq!(result.commands[1].command.priority_class, 1);
374 assert_eq!(result.commands[2].command.priority_class, 2);
375 }
376
377 #[test]
378 fn drain_sorts_by_source_within_priority() {
379 let mut q = IngressQueue::new(10);
380 q.submit(
381 vec![
382 make_sourced_cmd(1, 10, 2, 100),
383 make_sourced_cmd(1, 10, 1, 100),
384 make_sourced_cmd(1, 5, 0, 100),
385 ],
386 false,
387 );
388 let result = q.drain(TickId(0));
389 assert_eq!(result.commands[0].command.source_id, Some(5));
391 assert_eq!(result.commands[0].command.source_seq, Some(0));
392 assert_eq!(result.commands[1].command.source_id, Some(10));
394 assert_eq!(result.commands[1].command.source_seq, Some(1));
395 assert_eq!(result.commands[2].command.source_id, Some(10));
396 assert_eq!(result.commands[2].command.source_seq, Some(2));
397 }
398
399 #[test]
400 fn drain_sorts_by_arrival_seq_when_no_source() {
401 let mut q = IngressQueue::new(10);
402 q.submit(
404 vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)],
405 false,
406 );
407 let result = q.drain(TickId(0));
408 assert_eq!(result.commands[0].command.arrival_seq, 0);
409 assert_eq!(result.commands[1].command.arrival_seq, 1);
410 assert_eq!(result.commands[2].command.arrival_seq, 2);
411 }
412
413 #[test]
414 fn drain_mixed_source_and_no_source() {
415 let mut q = IngressQueue::new(10);
416 q.submit(
417 vec![
418 make_cmd(1, 100), make_sourced_cmd(1, 5, 0, 100), ],
421 false,
422 );
423 let result = q.drain(TickId(0));
424 assert_eq!(result.commands[0].command.source_id, Some(5));
426 assert_eq!(result.commands[1].command.source_id, None);
427 }
428
429 #[test]
430 fn drain_empty_queue() {
431 let mut q = IngressQueue::new(10);
432 let result = q.drain(TickId(0));
433 assert!(result.commands.is_empty());
434 assert!(result.expired_receipts.is_empty());
435 }
436
437 #[test]
438 fn drain_all_expired() {
439 let mut q = IngressQueue::new(10);
440 q.submit(vec![make_cmd(1, 0), make_cmd(1, 1), make_cmd(1, 2)], false);
441 let result = q.drain(TickId(10));
442 assert!(result.commands.is_empty());
443 assert_eq!(result.expired_receipts.len(), 3);
444 }
445
446 #[test]
447 fn drain_expired_receipts_preserve_command_index() {
448 let mut q = IngressQueue::new(10);
449 q.submit(
451 vec![
452 make_cmd(1, 100), make_cmd(1, 2), make_cmd(1, 1), make_cmd(1, 100), ],
457 false,
458 );
459 let result = q.drain(TickId(3));
460 assert_eq!(result.commands.len(), 2);
461 assert_eq!(result.expired_receipts.len(), 2);
462 assert_eq!(result.expired_receipts[0].command_index, 1);
464 assert_eq!(result.expired_receipts[1].command_index, 2);
465 }
466
467 #[test]
468 fn drain_expired_receipts_across_batches() {
469 let mut q = IngressQueue::new(10);
470 q.submit(vec![make_cmd(1, 0), make_cmd(1, 100)], false);
472 q.submit(vec![make_cmd(1, 100), make_cmd(1, 0)], false);
474 let result = q.drain(TickId(5));
475 assert_eq!(result.commands.len(), 2);
476 assert_eq!(result.expired_receipts.len(), 2);
477 assert_eq!(result.expired_receipts[0].command_index, 0);
479 assert_eq!(result.expired_receipts[1].command_index, 1);
481 }
482
483 #[test]
484 fn drain_clears_queue() {
485 let mut q = IngressQueue::new(10);
486 q.submit(vec![make_cmd(1, 100), make_cmd(1, 100)], false);
487 assert_eq!(q.len(), 2);
488 let _ = q.drain(TickId(0));
489 assert!(q.is_empty());
490 }
491
492 mod proptests {
495 use super::*;
496 use proptest::prelude::*;
497
498 fn arb_command() -> impl Strategy<Value = Command> {
499 (
500 0u8..4,
501 any::<u64>(),
502 prop::option::of(0u64..100),
503 prop::option::of(0u64..100),
504 )
505 .prop_map(|(prio, expires, src_id, src_seq)| Command {
506 payload: CommandPayload::SetParameter {
507 key: ParameterKey(0),
508 value: 0.0,
509 },
510 expires_after_tick: TickId(expires),
511 source_id: src_id,
512 source_seq: src_seq,
513 priority_class: prio,
514 arrival_seq: 0,
515 })
516 }
517
518 proptest! {
519 #[test]
520 fn drain_always_sorted(commands in prop::collection::vec(arb_command(), 0..64)) {
521 let mut q = IngressQueue::new(128);
522 q.submit(commands, false);
523 let result = q.drain(TickId(0));
524
525 for window in result.commands.windows(2) {
527 let a = &window[0].command;
528 let b = &window[1].command;
529 let key_a = (
530 a.priority_class,
531 a.source_id.unwrap_or(u64::MAX),
532 if a.source_id.is_some() { a.source_seq.unwrap_or(u64::MAX) } else { u64::MAX },
533 a.arrival_seq,
534 );
535 let key_b = (
536 b.priority_class,
537 b.source_id.unwrap_or(u64::MAX),
538 if b.source_id.is_some() { b.source_seq.unwrap_or(u64::MAX) } else { u64::MAX },
539 b.arrival_seq,
540 );
541 prop_assert!(key_a <= key_b, "sort violated: {key_a:?} > {key_b:?}");
542 }
543 }
544 }
545 }
546}