Skip to main content

murk_engine/
ingress.rs

1//! Bounded ingress queue with deterministic ordering and TTL evaluation.
2//!
3//! [`IngressQueue`] buffers commands between submission and tick execution.
4//! It enforces capacity limits, assigns monotonic arrival sequence numbers,
5//! evaluates TTL expiry, and sorts commands into a deterministic order for
6//! processing by the tick engine.
7//!
8//! # Ordering
9//!
10//! Commands are sorted by the composite key:
11//! `(priority_class, source_id|MAX, source_seq|MAX, arrival_seq)`
12//!
13//! This ensures:
14//! - Lower priority class values execute first (0 = system, 1 = user).
15//! - Within a priority class, source-keyed commands sort before anonymous ones.
16//! - Source-keyed commands from the same source execute in sequence order.
17//! - Anonymous commands execute in arrival order.
18
19use std::collections::VecDeque;
20
21use murk_core::command::{Command, Receipt};
22use murk_core::error::IngressError;
23use murk_core::id::TickId;
24
25/// A command paired with its original batch-local index from `submit()`.
26///
27/// Returned in [`DrainResult::commands`] so the tick engine can build
28/// receipts with correct `command_index` values even after priority reordering.
29#[derive(Debug)]
30pub struct DrainedCommand {
31    /// The command to execute.
32    pub command: Command,
33    /// The original batch-local index from the `submit()` call.
34    pub command_index: usize,
35}
36
37/// Result of draining the queue at the start of a tick.
38#[derive(Debug)]
39pub struct DrainResult {
40    /// Commands that passed TTL checks, sorted in deterministic order.
41    pub commands: Vec<DrainedCommand>,
42    /// Receipts for commands that expired before reaching the current tick.
43    pub expired_receipts: Vec<Receipt>,
44}
45
46/// A command paired with its original batch-local index.
47///
48/// Preserves the `command_index` from the `submit()` call so that
49/// `drain()` can produce correct receipts for expired commands.
50struct QueueEntry {
51    command: Command,
52    command_index: usize,
53}
54
55/// Bounded command queue for the ingress pipeline.
56///
57/// Accepts batches of commands via [`submit()`](IngressQueue::submit),
58/// assigns monotonic arrival sequence numbers, and produces a sorted,
59/// TTL-filtered batch via [`drain()`](IngressQueue::drain).
60pub struct IngressQueue {
61    queue: VecDeque<QueueEntry>,
62    capacity: usize,
63    next_arrival_seq: u64,
64}
65
66impl IngressQueue {
67    /// Create a new queue with the given capacity.
68    ///
69    /// # Panics
70    ///
71    /// Panics if `capacity` is zero.
72    pub fn new(capacity: usize) -> Self {
73        assert!(capacity > 0, "IngressQueue capacity must be at least 1");
74        Self {
75            queue: VecDeque::with_capacity(capacity),
76            capacity,
77            next_arrival_seq: 0,
78        }
79    }
80
81    /// Submit a batch of commands to the queue.
82    ///
83    /// Returns one [`Receipt`] per input command. Commands are accepted
84    /// in order until the queue is full; remaining commands receive
85    /// `QueueFull` receipts. If `tick_disabled` is true, all commands
86    /// are rejected with `TickDisabled`.
87    ///
88    /// Arrival sequence numbers are assigned from a monotonic counter
89    /// that persists across submit calls, overwriting whatever value
90    /// the caller may have set on `Command::arrival_seq`.
91    pub fn submit(&mut self, commands: Vec<Command>, tick_disabled: bool) -> Vec<Receipt> {
92        let mut receipts = Vec::with_capacity(commands.len());
93
94        for (i, mut cmd) in commands.into_iter().enumerate() {
95            if tick_disabled {
96                receipts.push(Receipt {
97                    accepted: false,
98                    applied_tick_id: None,
99                    reason_code: Some(IngressError::TickDisabled),
100                    command_index: i,
101                });
102                continue;
103            }
104
105            if self.queue.len() >= self.capacity {
106                receipts.push(Receipt {
107                    accepted: false,
108                    applied_tick_id: None,
109                    reason_code: Some(IngressError::QueueFull),
110                    command_index: i,
111                });
112                continue;
113            }
114
115            cmd.arrival_seq = self.next_arrival_seq;
116            self.next_arrival_seq += 1;
117            self.queue.push_back(QueueEntry {
118                command: cmd,
119                command_index: i,
120            });
121
122            receipts.push(Receipt {
123                accepted: true,
124                applied_tick_id: None,
125                reason_code: None,
126                command_index: i,
127            });
128        }
129
130        receipts
131    }
132
133    /// Drain the queue, filtering expired commands and sorting the rest.
134    ///
135    /// A command is expired if `cmd.expires_after_tick < current_tick`.
136    /// A command with `expires_after_tick == current_tick` is **valid**
137    /// during that tick.
138    ///
139    /// Returns a [`DrainResult`] containing the sorted valid commands
140    /// and receipts for expired commands.
141    pub fn drain(&mut self, current_tick: TickId) -> DrainResult {
142        let mut valid = Vec::new();
143        let mut expired_receipts = Vec::new();
144
145        for entry in self.queue.drain(..) {
146            if entry.command.expires_after_tick.0 < current_tick.0 {
147                expired_receipts.push(Receipt {
148                    accepted: true,
149                    applied_tick_id: None,
150                    reason_code: Some(IngressError::Stale),
151                    command_index: entry.command_index,
152                });
153            } else {
154                valid.push(DrainedCommand {
155                    command: entry.command,
156                    command_index: entry.command_index,
157                });
158            }
159        }
160
161        // Deterministic sort: (priority_class, source_id|MAX, source_seq|MAX, arrival_seq)
162        valid.sort_unstable_by_key(|dc| {
163            (
164                dc.command.priority_class,
165                dc.command.source_id.unwrap_or(u64::MAX),
166                dc.command.source_seq.unwrap_or(u64::MAX),
167                dc.command.arrival_seq,
168            )
169        });
170
171        DrainResult {
172            commands: valid,
173            expired_receipts,
174        }
175    }
176
177    /// Number of commands currently buffered.
178    pub fn len(&self) -> usize {
179        self.queue.len()
180    }
181
182    /// Whether the queue is empty.
183    pub fn is_empty(&self) -> bool {
184        self.queue.is_empty()
185    }
186
187    /// Maximum number of commands this queue can hold.
188    pub fn capacity(&self) -> usize {
189        self.capacity
190    }
191
192    /// Discard all pending commands.
193    ///
194    /// Called during [`TickEngine::reset()`](crate::TickEngine::reset) so
195    /// stale commands from previous ticks don't survive a reset.
196    pub fn clear(&mut self) {
197        self.queue.clear();
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use murk_core::command::CommandPayload;
205    use murk_core::id::ParameterKey;
206
207    fn make_cmd(priority: u8, expires: u64) -> Command {
208        Command {
209            payload: CommandPayload::SetParameter {
210                key: ParameterKey(0),
211                value: 0.0,
212            },
213            expires_after_tick: TickId(expires),
214            source_id: None,
215            source_seq: None,
216            priority_class: priority,
217            arrival_seq: 0,
218        }
219    }
220
221    fn make_sourced_cmd(priority: u8, source_id: u64, source_seq: u64, expires: u64) -> Command {
222        Command {
223            payload: CommandPayload::SetParameter {
224                key: ParameterKey(0),
225                value: 0.0,
226            },
227            expires_after_tick: TickId(expires),
228            source_id: Some(source_id),
229            source_seq: Some(source_seq),
230            priority_class: priority,
231            arrival_seq: 0,
232        }
233    }
234
235    // ── submit tests ───────────────────────────────────────────
236
237    #[test]
238    fn submit_assigns_monotonic_arrival_seq() {
239        let mut q = IngressQueue::new(10);
240        let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
241        let receipts = q.submit(cmds, false);
242        assert_eq!(receipts.len(), 3);
243        assert!(receipts.iter().all(|r| r.accepted));
244
245        // Drain and check arrival_seq 0, 1, 2
246        let result = q.drain(TickId(0));
247        assert_eq!(result.commands[0].command.arrival_seq, 0);
248        assert_eq!(result.commands[1].command.arrival_seq, 1);
249        assert_eq!(result.commands[2].command.arrival_seq, 2);
250    }
251
252    #[test]
253    fn submit_rejects_when_full() {
254        let mut q = IngressQueue::new(2);
255        let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
256        let receipts = q.submit(cmds, false);
257        assert!(receipts[0].accepted);
258        assert!(receipts[1].accepted);
259        assert!(!receipts[2].accepted);
260        assert_eq!(receipts[2].reason_code, Some(IngressError::QueueFull));
261    }
262
263    #[test]
264    fn submit_rejects_when_tick_disabled() {
265        let mut q = IngressQueue::new(10);
266        let cmds = vec![make_cmd(1, 100), make_cmd(1, 100)];
267        let receipts = q.submit(cmds, true);
268        assert!(!receipts[0].accepted);
269        assert_eq!(receipts[0].reason_code, Some(IngressError::TickDisabled));
270        assert!(!receipts[1].accepted);
271        assert_eq!(receipts[1].reason_code, Some(IngressError::TickDisabled));
272        assert!(q.is_empty());
273    }
274
275    #[test]
276    fn submit_partial_accept_on_overflow() {
277        let mut q = IngressQueue::new(3);
278        let cmds = vec![
279            make_cmd(1, 100),
280            make_cmd(1, 100),
281            make_cmd(1, 100),
282            make_cmd(1, 100),
283            make_cmd(1, 100),
284        ];
285        let receipts = q.submit(cmds, false);
286        assert_eq!(receipts.len(), 5);
287        assert!(receipts[0].accepted);
288        assert!(receipts[1].accepted);
289        assert!(receipts[2].accepted);
290        assert!(!receipts[3].accepted);
291        assert_eq!(receipts[3].reason_code, Some(IngressError::QueueFull));
292        assert!(!receipts[4].accepted);
293        assert_eq!(receipts[4].reason_code, Some(IngressError::QueueFull));
294        assert_eq!(q.len(), 3);
295    }
296
297    #[test]
298    fn arrival_seq_persists_across_submits() {
299        let mut q = IngressQueue::new(10);
300        q.submit(vec![make_cmd(1, 100), make_cmd(1, 100)], false);
301        q.submit(vec![make_cmd(1, 100)], false);
302        let result = q.drain(TickId(0));
303        assert_eq!(result.commands[0].command.arrival_seq, 0);
304        assert_eq!(result.commands[1].command.arrival_seq, 1);
305        assert_eq!(result.commands[2].command.arrival_seq, 2);
306    }
307
308    #[test]
309    fn receipt_command_index_matches_input() {
310        let mut q = IngressQueue::new(10);
311        let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
312        let receipts = q.submit(cmds, false);
313        assert_eq!(receipts[0].command_index, 0);
314        assert_eq!(receipts[1].command_index, 1);
315        assert_eq!(receipts[2].command_index, 2);
316    }
317
318    // ── drain tests ────────────────────────────────────────────
319
320    #[test]
321    fn drain_removes_expired_commands() {
322        let mut q = IngressQueue::new(10);
323        // Expires after tick 3 → expired at tick 4
324        q.submit(vec![make_cmd(1, 3)], false);
325        let result = q.drain(TickId(4));
326        assert!(result.commands.is_empty());
327        assert_eq!(result.expired_receipts.len(), 1);
328        assert_eq!(
329            result.expired_receipts[0].reason_code,
330            Some(IngressError::Stale)
331        );
332    }
333
334    #[test]
335    fn drain_keeps_valid_commands() {
336        let mut q = IngressQueue::new(10);
337        // Expires after tick 10 → valid at tick 5
338        q.submit(vec![make_cmd(1, 10)], false);
339        let result = q.drain(TickId(5));
340        assert_eq!(result.commands.len(), 1);
341        assert!(result.expired_receipts.is_empty());
342    }
343
344    #[test]
345    fn drain_boundary_expires_after_equals_current() {
346        let mut q = IngressQueue::new(10);
347        // expires_after_tick == 5, drain at tick 5 → VALID (still on that tick)
348        q.submit(vec![make_cmd(1, 5)], false);
349        let result = q.drain(TickId(5));
350        assert_eq!(result.commands.len(), 1);
351        assert!(result.expired_receipts.is_empty());
352    }
353
354    #[test]
355    fn drain_sorts_by_priority() {
356        let mut q = IngressQueue::new(10);
357        q.submit(
358            vec![make_cmd(2, 100), make_cmd(0, 100), make_cmd(1, 100)],
359            false,
360        );
361        let result = q.drain(TickId(0));
362        assert_eq!(result.commands[0].command.priority_class, 0);
363        assert_eq!(result.commands[1].command.priority_class, 1);
364        assert_eq!(result.commands[2].command.priority_class, 2);
365    }
366
367    #[test]
368    fn drain_sorts_by_source_within_priority() {
369        let mut q = IngressQueue::new(10);
370        q.submit(
371            vec![
372                make_sourced_cmd(1, 10, 2, 100),
373                make_sourced_cmd(1, 10, 1, 100),
374                make_sourced_cmd(1, 5, 0, 100),
375            ],
376            false,
377        );
378        let result = q.drain(TickId(0));
379        // source_id 5 < 10, so it comes first
380        assert_eq!(result.commands[0].command.source_id, Some(5));
381        assert_eq!(result.commands[0].command.source_seq, Some(0));
382        // Then source_id 10 seq 1 before seq 2
383        assert_eq!(result.commands[1].command.source_id, Some(10));
384        assert_eq!(result.commands[1].command.source_seq, Some(1));
385        assert_eq!(result.commands[2].command.source_id, Some(10));
386        assert_eq!(result.commands[2].command.source_seq, Some(2));
387    }
388
389    #[test]
390    fn drain_sorts_by_arrival_seq_when_no_source() {
391        let mut q = IngressQueue::new(10);
392        // All same priority, no source → sorted by arrival_seq
393        q.submit(
394            vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)],
395            false,
396        );
397        let result = q.drain(TickId(0));
398        assert_eq!(result.commands[0].command.arrival_seq, 0);
399        assert_eq!(result.commands[1].command.arrival_seq, 1);
400        assert_eq!(result.commands[2].command.arrival_seq, 2);
401    }
402
403    #[test]
404    fn drain_mixed_source_and_no_source() {
405        let mut q = IngressQueue::new(10);
406        q.submit(
407            vec![
408                make_cmd(1, 100),               // no source → source_id=MAX
409                make_sourced_cmd(1, 5, 0, 100), // source_id=5
410            ],
411            false,
412        );
413        let result = q.drain(TickId(0));
414        // source_id 5 < u64::MAX, so sourced command comes first
415        assert_eq!(result.commands[0].command.source_id, Some(5));
416        assert_eq!(result.commands[1].command.source_id, None);
417    }
418
419    #[test]
420    fn drain_empty_queue() {
421        let mut q = IngressQueue::new(10);
422        let result = q.drain(TickId(0));
423        assert!(result.commands.is_empty());
424        assert!(result.expired_receipts.is_empty());
425    }
426
427    #[test]
428    fn drain_all_expired() {
429        let mut q = IngressQueue::new(10);
430        q.submit(vec![make_cmd(1, 0), make_cmd(1, 1), make_cmd(1, 2)], false);
431        let result = q.drain(TickId(10));
432        assert!(result.commands.is_empty());
433        assert_eq!(result.expired_receipts.len(), 3);
434    }
435
436    #[test]
437    fn drain_expired_receipts_preserve_command_index() {
438        let mut q = IngressQueue::new(10);
439        // Submit a batch of 4 commands; the middle two will expire.
440        q.submit(
441            vec![
442                make_cmd(1, 100), // index 0 — valid
443                make_cmd(1, 2),   // index 1 — expires at tick 3
444                make_cmd(1, 1),   // index 2 — expires at tick 3
445                make_cmd(1, 100), // index 3 — valid
446            ],
447            false,
448        );
449        let result = q.drain(TickId(3));
450        assert_eq!(result.commands.len(), 2);
451        assert_eq!(result.expired_receipts.len(), 2);
452        // Expired receipts carry their original batch indices, not 0.
453        assert_eq!(result.expired_receipts[0].command_index, 1);
454        assert_eq!(result.expired_receipts[1].command_index, 2);
455    }
456
457    #[test]
458    fn drain_expired_receipts_across_batches() {
459        let mut q = IngressQueue::new(10);
460        // Batch 1: indices 0, 1
461        q.submit(vec![make_cmd(1, 0), make_cmd(1, 100)], false);
462        // Batch 2: indices 0, 1 (new batch, indices reset)
463        q.submit(vec![make_cmd(1, 100), make_cmd(1, 0)], false);
464        let result = q.drain(TickId(5));
465        assert_eq!(result.commands.len(), 2);
466        assert_eq!(result.expired_receipts.len(), 2);
467        // First expired came from batch 1 index 0
468        assert_eq!(result.expired_receipts[0].command_index, 0);
469        // Second expired came from batch 2 index 1
470        assert_eq!(result.expired_receipts[1].command_index, 1);
471    }
472
473    #[test]
474    fn drain_clears_queue() {
475        let mut q = IngressQueue::new(10);
476        q.submit(vec![make_cmd(1, 100), make_cmd(1, 100)], false);
477        assert_eq!(q.len(), 2);
478        let _ = q.drain(TickId(0));
479        assert!(q.is_empty());
480    }
481
482    // ── proptest ───────────────────────────────────────────────
483
484    mod proptests {
485        use super::*;
486        use proptest::prelude::*;
487
488        fn arb_command() -> impl Strategy<Value = Command> {
489            (
490                0u8..4,
491                any::<u64>(),
492                prop::option::of(0u64..100),
493                prop::option::of(0u64..100),
494            )
495                .prop_map(|(prio, expires, src_id, src_seq)| Command {
496                    payload: CommandPayload::SetParameter {
497                        key: ParameterKey(0),
498                        value: 0.0,
499                    },
500                    expires_after_tick: TickId(expires),
501                    source_id: src_id,
502                    source_seq: src_seq,
503                    priority_class: prio,
504                    arrival_seq: 0,
505                })
506        }
507
508        proptest! {
509            #[test]
510            fn drain_always_sorted(commands in prop::collection::vec(arb_command(), 0..64)) {
511                let mut q = IngressQueue::new(128);
512                q.submit(commands, false);
513                let result = q.drain(TickId(0));
514
515                // Verify sort order
516                for window in result.commands.windows(2) {
517                    let a = &window[0].command;
518                    let b = &window[1].command;
519                    let key_a = (
520                        a.priority_class,
521                        a.source_id.unwrap_or(u64::MAX),
522                        a.source_seq.unwrap_or(u64::MAX),
523                        a.arrival_seq,
524                    );
525                    let key_b = (
526                        b.priority_class,
527                        b.source_id.unwrap_or(u64::MAX),
528                        b.source_seq.unwrap_or(u64::MAX),
529                        b.arrival_seq,
530                    );
531                    prop_assert!(key_a <= key_b, "sort violated: {key_a:?} > {key_b:?}");
532                }
533            }
534        }
535    }
536}