Skip to main content

murk_engine/
ingress.rs

1//! Bounded ingress queue with deterministic ordering and TTL evaluation.
2//!
3//! [`IngressQueue`] buffers commands between submission and tick execution.
4//! It enforces capacity limits, assigns monotonic arrival sequence numbers,
5//! evaluates TTL expiry, and sorts commands into a deterministic order for
6//! processing by the tick engine.
7//!
8//! # Ordering
9//!
10//! Commands are sorted by the composite key:
11//! `(priority_class, source_id|MAX, source_seq|MAX, arrival_seq)`
12//!
13//! This ensures:
14//! - Lower priority class values execute first (0 = system, 1 = user).
15//! - Within a priority class, source-keyed commands sort before anonymous ones.
16//! - Source-keyed commands from the same source execute in sequence order.
17//! - Anonymous commands execute in arrival order.
18
19use std::collections::VecDeque;
20
21use murk_core::command::{Command, Receipt};
22use murk_core::error::IngressError;
23use murk_core::id::TickId;
24
25/// A command paired with its original batch-local index from `submit()`.
26///
27/// Returned in [`DrainResult::commands`] so the tick engine can build
28/// receipts with correct `command_index` values even after priority reordering.
29#[derive(Debug)]
30pub struct DrainedCommand {
31    /// The command to execute.
32    pub command: Command,
33    /// The original batch-local index from the `submit()` call.
34    pub command_index: usize,
35}
36
37/// Result of draining the queue at the start of a tick.
38#[derive(Debug)]
39pub struct DrainResult {
40    /// Commands that passed TTL checks, sorted in deterministic order.
41    pub commands: Vec<DrainedCommand>,
42    /// Receipts for commands that expired before reaching the current tick.
43    pub expired_receipts: Vec<Receipt>,
44}
45
46/// A command paired with its original batch-local index.
47///
48/// Preserves the `command_index` from the `submit()` call so that
49/// `drain()` can produce correct receipts for expired commands.
50struct QueueEntry {
51    command: Command,
52    command_index: usize,
53}
54
55/// Bounded command queue for the ingress pipeline.
56///
57/// Accepts batches of commands via [`submit()`](IngressQueue::submit),
58/// assigns monotonic arrival sequence numbers, and produces a sorted,
59/// TTL-filtered batch via [`drain()`](IngressQueue::drain).
60pub struct IngressQueue {
61    queue: VecDeque<QueueEntry>,
62    capacity: usize,
63    next_arrival_seq: u64,
64}
65
66impl IngressQueue {
67    /// Create a new queue with the given capacity.
68    ///
69    /// # Panics
70    ///
71    /// Panics if `capacity` is zero.
72    pub fn new(capacity: usize) -> Self {
73        assert!(capacity > 0, "IngressQueue capacity must be at least 1");
74        Self {
75            queue: VecDeque::with_capacity(capacity),
76            capacity,
77            next_arrival_seq: 0,
78        }
79    }
80
81    /// Submit a batch of commands to the queue.
82    ///
83    /// Returns one [`Receipt`] per input command. Commands are accepted
84    /// in order until the queue is full; remaining commands receive
85    /// `QueueFull` receipts. If `tick_disabled` is true, all commands
86    /// are rejected with `TickDisabled`.
87    ///
88    /// Arrival sequence numbers are assigned from a monotonic counter
89    /// that persists across submit calls, overwriting whatever value
90    /// the caller may have set on `Command::arrival_seq`.
91    pub fn submit(&mut self, commands: Vec<Command>, tick_disabled: bool) -> Vec<Receipt> {
92        let mut receipts = Vec::with_capacity(commands.len());
93
94        for (i, mut cmd) in commands.into_iter().enumerate() {
95            if tick_disabled {
96                receipts.push(Receipt {
97                    accepted: false,
98                    applied_tick_id: None,
99                    reason_code: Some(IngressError::TickDisabled),
100                    command_index: i,
101                });
102                continue;
103            }
104
105            if self.queue.len() >= self.capacity {
106                receipts.push(Receipt {
107                    accepted: false,
108                    applied_tick_id: None,
109                    reason_code: Some(IngressError::QueueFull),
110                    command_index: i,
111                });
112                continue;
113            }
114
115            // Normalize: anonymous commands must not carry a source_seq.
116            if cmd.source_id.is_none() {
117                cmd.source_seq = None;
118            }
119            cmd.arrival_seq = self.next_arrival_seq;
120            self.next_arrival_seq += 1;
121            self.queue.push_back(QueueEntry {
122                command: cmd,
123                command_index: i,
124            });
125
126            receipts.push(Receipt {
127                accepted: true,
128                applied_tick_id: None,
129                reason_code: None,
130                command_index: i,
131            });
132        }
133
134        receipts
135    }
136
137    /// Drain the queue, filtering expired commands and sorting the rest.
138    ///
139    /// A command is expired if `cmd.expires_after_tick < current_tick`.
140    /// A command with `expires_after_tick == current_tick` is **valid**
141    /// during that tick.
142    ///
143    /// Returns a [`DrainResult`] containing the sorted valid commands
144    /// and receipts for expired commands.
145    pub fn drain(&mut self, current_tick: TickId) -> DrainResult {
146        let mut valid = Vec::new();
147        let mut expired_receipts = Vec::new();
148
149        for entry in self.queue.drain(..) {
150            if entry.command.expires_after_tick.0 < current_tick.0 {
151                expired_receipts.push(Receipt {
152                    accepted: true,
153                    applied_tick_id: None,
154                    reason_code: Some(IngressError::Stale),
155                    command_index: entry.command_index,
156                });
157            } else {
158                valid.push(DrainedCommand {
159                    command: entry.command,
160                    command_index: entry.command_index,
161                });
162            }
163        }
164
165        // Deterministic sort: (priority_class, source_id|MAX, source_seq|MAX, arrival_seq)
166        valid.sort_unstable_by_key(|dc| {
167            let c = &dc.command;
168            (
169                c.priority_class,
170                c.source_id.unwrap_or(u64::MAX),
171                // source_seq only meaningful when source_id is present
172                if c.source_id.is_some() {
173                    c.source_seq.unwrap_or(u64::MAX)
174                } else {
175                    u64::MAX
176                },
177                c.arrival_seq,
178            )
179        });
180
181        DrainResult {
182            commands: valid,
183            expired_receipts,
184        }
185    }
186
187    /// Number of commands currently buffered.
188    pub fn len(&self) -> usize {
189        self.queue.len()
190    }
191
192    /// Whether the queue is empty.
193    pub fn is_empty(&self) -> bool {
194        self.queue.is_empty()
195    }
196
197    /// Maximum number of commands this queue can hold.
198    pub fn capacity(&self) -> usize {
199        self.capacity
200    }
201
202    /// Discard all pending commands.
203    ///
204    /// Called during [`TickEngine::reset()`](crate::TickEngine::reset) so
205    /// stale commands from previous ticks don't survive a reset.
206    pub fn clear(&mut self) {
207        self.queue.clear();
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use murk_core::command::CommandPayload;
215    use murk_core::id::ParameterKey;
216
217    fn make_cmd(priority: u8, expires: u64) -> Command {
218        Command {
219            payload: CommandPayload::SetParameter {
220                key: ParameterKey(0),
221                value: 0.0,
222            },
223            expires_after_tick: TickId(expires),
224            source_id: None,
225            source_seq: None,
226            priority_class: priority,
227            arrival_seq: 0,
228        }
229    }
230
231    fn make_sourced_cmd(priority: u8, source_id: u64, source_seq: u64, expires: u64) -> Command {
232        Command {
233            payload: CommandPayload::SetParameter {
234                key: ParameterKey(0),
235                value: 0.0,
236            },
237            expires_after_tick: TickId(expires),
238            source_id: Some(source_id),
239            source_seq: Some(source_seq),
240            priority_class: priority,
241            arrival_seq: 0,
242        }
243    }
244
245    // ── submit tests ───────────────────────────────────────────
246
247    #[test]
248    fn submit_assigns_monotonic_arrival_seq() {
249        let mut q = IngressQueue::new(10);
250        let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
251        let receipts = q.submit(cmds, false);
252        assert_eq!(receipts.len(), 3);
253        assert!(receipts.iter().all(|r| r.accepted));
254
255        // Drain and check arrival_seq 0, 1, 2
256        let result = q.drain(TickId(0));
257        assert_eq!(result.commands[0].command.arrival_seq, 0);
258        assert_eq!(result.commands[1].command.arrival_seq, 1);
259        assert_eq!(result.commands[2].command.arrival_seq, 2);
260    }
261
262    #[test]
263    fn submit_rejects_when_full() {
264        let mut q = IngressQueue::new(2);
265        let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
266        let receipts = q.submit(cmds, false);
267        assert!(receipts[0].accepted);
268        assert!(receipts[1].accepted);
269        assert!(!receipts[2].accepted);
270        assert_eq!(receipts[2].reason_code, Some(IngressError::QueueFull));
271    }
272
273    #[test]
274    fn submit_rejects_when_tick_disabled() {
275        let mut q = IngressQueue::new(10);
276        let cmds = vec![make_cmd(1, 100), make_cmd(1, 100)];
277        let receipts = q.submit(cmds, true);
278        assert!(!receipts[0].accepted);
279        assert_eq!(receipts[0].reason_code, Some(IngressError::TickDisabled));
280        assert!(!receipts[1].accepted);
281        assert_eq!(receipts[1].reason_code, Some(IngressError::TickDisabled));
282        assert!(q.is_empty());
283    }
284
285    #[test]
286    fn submit_partial_accept_on_overflow() {
287        let mut q = IngressQueue::new(3);
288        let cmds = vec![
289            make_cmd(1, 100),
290            make_cmd(1, 100),
291            make_cmd(1, 100),
292            make_cmd(1, 100),
293            make_cmd(1, 100),
294        ];
295        let receipts = q.submit(cmds, false);
296        assert_eq!(receipts.len(), 5);
297        assert!(receipts[0].accepted);
298        assert!(receipts[1].accepted);
299        assert!(receipts[2].accepted);
300        assert!(!receipts[3].accepted);
301        assert_eq!(receipts[3].reason_code, Some(IngressError::QueueFull));
302        assert!(!receipts[4].accepted);
303        assert_eq!(receipts[4].reason_code, Some(IngressError::QueueFull));
304        assert_eq!(q.len(), 3);
305    }
306
307    #[test]
308    fn arrival_seq_persists_across_submits() {
309        let mut q = IngressQueue::new(10);
310        q.submit(vec![make_cmd(1, 100), make_cmd(1, 100)], false);
311        q.submit(vec![make_cmd(1, 100)], false);
312        let result = q.drain(TickId(0));
313        assert_eq!(result.commands[0].command.arrival_seq, 0);
314        assert_eq!(result.commands[1].command.arrival_seq, 1);
315        assert_eq!(result.commands[2].command.arrival_seq, 2);
316    }
317
318    #[test]
319    fn receipt_command_index_matches_input() {
320        let mut q = IngressQueue::new(10);
321        let cmds = vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)];
322        let receipts = q.submit(cmds, false);
323        assert_eq!(receipts[0].command_index, 0);
324        assert_eq!(receipts[1].command_index, 1);
325        assert_eq!(receipts[2].command_index, 2);
326    }
327
328    // ── drain tests ────────────────────────────────────────────
329
330    #[test]
331    fn drain_removes_expired_commands() {
332        let mut q = IngressQueue::new(10);
333        // Expires after tick 3 → expired at tick 4
334        q.submit(vec![make_cmd(1, 3)], false);
335        let result = q.drain(TickId(4));
336        assert!(result.commands.is_empty());
337        assert_eq!(result.expired_receipts.len(), 1);
338        assert_eq!(
339            result.expired_receipts[0].reason_code,
340            Some(IngressError::Stale)
341        );
342    }
343
344    #[test]
345    fn drain_keeps_valid_commands() {
346        let mut q = IngressQueue::new(10);
347        // Expires after tick 10 → valid at tick 5
348        q.submit(vec![make_cmd(1, 10)], false);
349        let result = q.drain(TickId(5));
350        assert_eq!(result.commands.len(), 1);
351        assert!(result.expired_receipts.is_empty());
352    }
353
354    #[test]
355    fn drain_boundary_expires_after_equals_current() {
356        let mut q = IngressQueue::new(10);
357        // expires_after_tick == 5, drain at tick 5 → VALID (still on that tick)
358        q.submit(vec![make_cmd(1, 5)], false);
359        let result = q.drain(TickId(5));
360        assert_eq!(result.commands.len(), 1);
361        assert!(result.expired_receipts.is_empty());
362    }
363
364    #[test]
365    fn drain_sorts_by_priority() {
366        let mut q = IngressQueue::new(10);
367        q.submit(
368            vec![make_cmd(2, 100), make_cmd(0, 100), make_cmd(1, 100)],
369            false,
370        );
371        let result = q.drain(TickId(0));
372        assert_eq!(result.commands[0].command.priority_class, 0);
373        assert_eq!(result.commands[1].command.priority_class, 1);
374        assert_eq!(result.commands[2].command.priority_class, 2);
375    }
376
377    #[test]
378    fn drain_sorts_by_source_within_priority() {
379        let mut q = IngressQueue::new(10);
380        q.submit(
381            vec![
382                make_sourced_cmd(1, 10, 2, 100),
383                make_sourced_cmd(1, 10, 1, 100),
384                make_sourced_cmd(1, 5, 0, 100),
385            ],
386            false,
387        );
388        let result = q.drain(TickId(0));
389        // source_id 5 < 10, so it comes first
390        assert_eq!(result.commands[0].command.source_id, Some(5));
391        assert_eq!(result.commands[0].command.source_seq, Some(0));
392        // Then source_id 10 seq 1 before seq 2
393        assert_eq!(result.commands[1].command.source_id, Some(10));
394        assert_eq!(result.commands[1].command.source_seq, Some(1));
395        assert_eq!(result.commands[2].command.source_id, Some(10));
396        assert_eq!(result.commands[2].command.source_seq, Some(2));
397    }
398
399    #[test]
400    fn drain_sorts_by_arrival_seq_when_no_source() {
401        let mut q = IngressQueue::new(10);
402        // All same priority, no source → sorted by arrival_seq
403        q.submit(
404            vec![make_cmd(1, 100), make_cmd(1, 100), make_cmd(1, 100)],
405            false,
406        );
407        let result = q.drain(TickId(0));
408        assert_eq!(result.commands[0].command.arrival_seq, 0);
409        assert_eq!(result.commands[1].command.arrival_seq, 1);
410        assert_eq!(result.commands[2].command.arrival_seq, 2);
411    }
412
413    #[test]
414    fn drain_mixed_source_and_no_source() {
415        let mut q = IngressQueue::new(10);
416        q.submit(
417            vec![
418                make_cmd(1, 100),               // no source → source_id=MAX
419                make_sourced_cmd(1, 5, 0, 100), // source_id=5
420            ],
421            false,
422        );
423        let result = q.drain(TickId(0));
424        // source_id 5 < u64::MAX, so sourced command comes first
425        assert_eq!(result.commands[0].command.source_id, Some(5));
426        assert_eq!(result.commands[1].command.source_id, None);
427    }
428
429    #[test]
430    fn drain_empty_queue() {
431        let mut q = IngressQueue::new(10);
432        let result = q.drain(TickId(0));
433        assert!(result.commands.is_empty());
434        assert!(result.expired_receipts.is_empty());
435    }
436
437    #[test]
438    fn drain_all_expired() {
439        let mut q = IngressQueue::new(10);
440        q.submit(vec![make_cmd(1, 0), make_cmd(1, 1), make_cmd(1, 2)], false);
441        let result = q.drain(TickId(10));
442        assert!(result.commands.is_empty());
443        assert_eq!(result.expired_receipts.len(), 3);
444    }
445
446    #[test]
447    fn drain_expired_receipts_preserve_command_index() {
448        let mut q = IngressQueue::new(10);
449        // Submit a batch of 4 commands; the middle two will expire.
450        q.submit(
451            vec![
452                make_cmd(1, 100), // index 0 — valid
453                make_cmd(1, 2),   // index 1 — expires at tick 3
454                make_cmd(1, 1),   // index 2 — expires at tick 3
455                make_cmd(1, 100), // index 3 — valid
456            ],
457            false,
458        );
459        let result = q.drain(TickId(3));
460        assert_eq!(result.commands.len(), 2);
461        assert_eq!(result.expired_receipts.len(), 2);
462        // Expired receipts carry their original batch indices, not 0.
463        assert_eq!(result.expired_receipts[0].command_index, 1);
464        assert_eq!(result.expired_receipts[1].command_index, 2);
465    }
466
467    #[test]
468    fn drain_expired_receipts_across_batches() {
469        let mut q = IngressQueue::new(10);
470        // Batch 1: indices 0, 1
471        q.submit(vec![make_cmd(1, 0), make_cmd(1, 100)], false);
472        // Batch 2: indices 0, 1 (new batch, indices reset)
473        q.submit(vec![make_cmd(1, 100), make_cmd(1, 0)], false);
474        let result = q.drain(TickId(5));
475        assert_eq!(result.commands.len(), 2);
476        assert_eq!(result.expired_receipts.len(), 2);
477        // First expired came from batch 1 index 0
478        assert_eq!(result.expired_receipts[0].command_index, 0);
479        // Second expired came from batch 2 index 1
480        assert_eq!(result.expired_receipts[1].command_index, 1);
481    }
482
483    #[test]
484    fn drain_clears_queue() {
485        let mut q = IngressQueue::new(10);
486        q.submit(vec![make_cmd(1, 100), make_cmd(1, 100)], false);
487        assert_eq!(q.len(), 2);
488        let _ = q.drain(TickId(0));
489        assert!(q.is_empty());
490    }
491
492    // ── proptest ───────────────────────────────────────────────
493
494    mod proptests {
495        use super::*;
496        use proptest::prelude::*;
497
498        fn arb_command() -> impl Strategy<Value = Command> {
499            (
500                0u8..4,
501                any::<u64>(),
502                prop::option::of(0u64..100),
503                prop::option::of(0u64..100),
504            )
505                .prop_map(|(prio, expires, src_id, src_seq)| Command {
506                    payload: CommandPayload::SetParameter {
507                        key: ParameterKey(0),
508                        value: 0.0,
509                    },
510                    expires_after_tick: TickId(expires),
511                    source_id: src_id,
512                    source_seq: src_seq,
513                    priority_class: prio,
514                    arrival_seq: 0,
515                })
516        }
517
518        proptest! {
519            #[test]
520            fn drain_always_sorted(commands in prop::collection::vec(arb_command(), 0..64)) {
521                let mut q = IngressQueue::new(128);
522                q.submit(commands, false);
523                let result = q.drain(TickId(0));
524
525                // Verify sort order
526                for window in result.commands.windows(2) {
527                    let a = &window[0].command;
528                    let b = &window[1].command;
529                    let key_a = (
530                        a.priority_class,
531                        a.source_id.unwrap_or(u64::MAX),
532                        if a.source_id.is_some() { a.source_seq.unwrap_or(u64::MAX) } else { u64::MAX },
533                        a.arrival_seq,
534                    );
535                    let key_b = (
536                        b.priority_class,
537                        b.source_id.unwrap_or(u64::MAX),
538                        if b.source_id.is_some() { b.source_seq.unwrap_or(u64::MAX) } else { u64::MAX },
539                        b.arrival_seq,
540                    );
541                    prop_assert!(key_a <= key_b, "sort violated: {key_a:?} > {key_b:?}");
542                }
543            }
544        }
545    }
546}