Skip to main content

limen_core/
policy.rs

1//! Policies for batching, budgets, deadlines, admission, and capacities.
2
3use crate::types::{DeadlineNs, QoSClass, Ticks};
4
5/// Configuration for a sliding window.
6///
7/// A sliding window produces overlapping (or non-overlapping) windows of `size` items,
8/// advancing by `stride` items per step.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub struct SlidingWindow {
11    /// Number of items the window start advances between consecutive windows.
12    stride: usize,
13}
14
15impl SlidingWindow {
16    /// Creates a new sliding window configuration.
17    ///
18    /// `stride` is how many items the window start advances between consecutive windows.
19    #[must_use]
20    pub fn new(stride: usize) -> Self {
21        Self { stride }
22    }
23
24    /// Returns how many items the window start advances between consecutive windows.
25    #[must_use]
26    pub fn stride(&self) -> &usize {
27        &self.stride
28    }
29}
30
31/// How batches are formed over an input stream.
32///
33/// The window policy controls how items from a stream are grouped into `Window`s.
34///
35/// - [`WindowKind::Disjoint`] (default): non-overlapping windows; each window consumes
36///   `size` items from the stream and the next window begins immediately after.
37/// - [`WindowKind::Sliding`]: fixed-size windows that advance by a fixed step. Each
38///   produced window contains `size` items, and the next window starts `stride` items
39///   after the previous window start. When `stride == size`, this is equivalent to
40///   [`WindowKind::Disjoint`].
41///
42/// For [`WindowKind::Sliding`], `stride <= size` is expected.
43#[non_exhaustive]
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WindowKind {
46    /// Non-overlapping (disjoint) windows; default.
47    Disjoint,
48    /// Sliding windows: each produced window has `size` items and windows advance
49    /// by `stride` items between steps.
50    Sliding(SlidingWindow),
51}
52
53/// Batch formation policy: fixed-N and/or Δt micro-batching.
54#[non_exhaustive]
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct BatchingPolicy {
57    /// Fixed number of items per batch (>= 1). If `None`, do not use fixed-N.
58    fixed_n: Option<usize>,
59
60    /// Maximum micro-batching window in ticks (Δt). If `None`, no Δt cap.
61    max_delta_t: Option<Ticks>,
62
63    /// Windowing style: disjoint (default) or sliding (opt-in).
64    window_kind: WindowKind,
65}
66
67impl BatchingPolicy {
68    /// No batching (batch size 1).
69    pub const fn none() -> Self {
70        Self {
71            fixed_n: Some(1),
72            max_delta_t: None,
73            window_kind: WindowKind::Disjoint,
74        }
75    }
76
77    /// Fixed-N batching.
78    pub const fn fixed(n: usize) -> Self {
79        Self {
80            fixed_n: Some(n),
81            max_delta_t: None,
82            window_kind: WindowKind::Disjoint,
83        }
84    }
85
86    /// Δt-bounded micro-batching (runtime may still emit batch < N if time cap hits).
87    pub const fn delta_t(cap: Ticks) -> Self {
88        Self {
89            fixed_n: None,
90            max_delta_t: Some(cap),
91            window_kind: WindowKind::Disjoint,
92        }
93    }
94
95    /// Combined fixed-N and Δt caps.
96    pub const fn fixed_and_delta_t(n: usize, cap: Ticks) -> Self {
97        Self {
98            fixed_n: Some(n),
99            max_delta_t: Some(cap),
100            window_kind: WindowKind::Disjoint,
101        }
102    }
103
104    /// Combined fixed-N and Δt caps with explicit window kind (e.g., sliding).
105    #[inline]
106    pub const fn with_window(
107        n: Option<usize>,
108        cap: Option<Ticks>,
109        window_kind: WindowKind,
110    ) -> Self {
111        Self {
112            fixed_n: n,
113            max_delta_t: cap,
114            window_kind,
115        }
116    }
117
118    /// Convenience: fixed-N with explicit window kind.
119    #[inline]
120    pub const fn fixed_with_window(n: usize, window_kind: WindowKind) -> Self {
121        Self {
122            fixed_n: Some(n),
123            max_delta_t: None,
124            window_kind,
125        }
126    }
127
128    /// Convenience: delta-t with explicit window kind.
129    #[inline]
130    pub const fn delta_t_with_window(cap: Ticks, window_kind: WindowKind) -> Self {
131        Self {
132            fixed_n: None,
133            max_delta_t: Some(cap),
134            window_kind,
135        }
136    }
137
138    /// Borrow the fixed-N batching value (if any).
139    #[inline]
140    pub const fn fixed_n(&self) -> &Option<usize> {
141        &self.fixed_n
142    }
143
144    /// Borrow the delta-t cap (if any).
145    #[inline]
146    pub const fn max_delta_t(&self) -> &Option<Ticks> {
147        &self.max_delta_t
148    }
149
150    /// Return the window kind.
151    #[inline]
152    pub const fn window_kind(&self) -> WindowKind {
153        self.window_kind
154    }
155}
156
157impl Default for BatchingPolicy {
158    fn default() -> Self {
159        BatchingPolicy::none()
160    }
161}
162
163/// Budget policy for node execution.
164#[non_exhaustive]
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
166pub struct BudgetPolicy {
167    /// Per-step tick budget; exceeding invokes over-budget action.
168    tick_budget: Option<Ticks>,
169    /// *Hard* guardrail for a single step. If exceeded, the runtime may call
170    /// `Node::on_watchdog_timeout` and/or apply `OverBudgetAction`.
171    ///
172    /// Keep in no_std: only requires a monotonic clock; no OS timers.
173    watchdog_ticks: Option<Ticks>,
174}
175
176impl BudgetPolicy {
177    /// Construct a new budget policy.
178    pub const fn new(tick_budget: Option<Ticks>, watchdog_ticks: Option<Ticks>) -> Self {
179        Self {
180            tick_budget,
181            watchdog_ticks,
182        }
183    }
184
185    /// Borrow the tick budget.
186    #[inline]
187    pub const fn tick_budget(&self) -> &Option<Ticks> {
188        &self.tick_budget
189    }
190
191    /// Borrow the watchdog ticks.
192    #[inline]
193    pub const fn watchdog_ticks(&self) -> &Option<Ticks> {
194        &self.watchdog_ticks
195    }
196}
197
198/// Deadline policy for messages processed by a node.
199#[non_exhaustive]
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
201pub struct DeadlinePolicy {
202    /// Whether to require absolute deadlines on inputs (P2).
203    require_absolute_deadline: bool,
204    /// Optional slack tolerance (ns) before defaulting/degrading.
205    slack_tolerance_ns: Option<DeadlineNs>,
206    /// Synthesize a deadline when inputs have none: absolute_deadline = now + value.
207    /// Leave `None` to avoid synthesizing (strict mode or non-EDF).
208    default_deadline_ns: Option<DeadlineNs>,
209}
210
211impl DeadlinePolicy {
212    /// Construct a deadline policy.
213    pub const fn new(
214        require_absolute_deadline: bool,
215        slack_tolerance_ns: Option<DeadlineNs>,
216        default_deadline_ns: Option<DeadlineNs>,
217    ) -> Self {
218        Self {
219            require_absolute_deadline,
220            slack_tolerance_ns,
221            default_deadline_ns,
222        }
223    }
224
225    /// Borrow the require_absolute_deadline bool.
226    #[inline]
227    pub const fn require_absolute_deadline(&self) -> bool {
228        self.require_absolute_deadline
229    }
230
231    /// Borrow the slack_tolerance_ns.
232    #[inline]
233    pub const fn slack_tolerance_ns(&self) -> &Option<DeadlineNs> {
234        &self.slack_tolerance_ns
235    }
236
237    /// Borrow the default_deadline_ns.
238    #[inline]
239    pub const fn default_deadline_ns(&self) -> &Option<DeadlineNs> {
240        &self.default_deadline_ns
241    }
242}
243
244/// Action to take when budgets or deadlines are breached.
245#[non_exhaustive]
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum OverBudgetAction {
248    /// Drop the message(s).
249    Drop,
250    /// Skip the stage and signal upstream (if applicable).
251    SkipStage,
252    /// Degrade to a faster path (e.g., quantized model).
253    Degrade,
254    /// Emit a default value.
255    DefaultOnTimeout,
256}
257
258/// Queue capacity and watermark configuration.
259///
260/// `soft_*` define backpressure **watermarks**; `max_*` define **hard caps**.
261/// Watermark state is derived from live occupancy snapshots and drives scheduling/backpressure.
262#[non_exhaustive]
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub struct QueueCaps {
265    /// Maximum number of items permitted (hard cap).
266    pub max_items: usize,
267    /// Soft watermark for items; admission adapts between soft and hard.
268    pub soft_items: usize,
269    /// Optional byte cap and soft watermark for payload sizes.
270    pub max_bytes: Option<usize>,
271    /// Optional soft watermark in bytes.
272    pub soft_bytes: Option<usize>,
273}
274
275impl QueueCaps {
276    /// Construct new queue caps.
277    pub const fn new(
278        max_items: usize,
279        soft_items: usize,
280        max_bytes: Option<usize>,
281        soft_bytes: Option<usize>,
282    ) -> Self {
283        Self {
284            max_items,
285            soft_items,
286            max_bytes,
287            soft_bytes,
288        }
289    }
290
291    /// Borrow max_items.
292    #[inline]
293    pub const fn max_items(&self) -> &usize {
294        &self.max_items
295    }
296
297    /// Borrow soft_items.
298    #[inline]
299    pub const fn soft_items(&self) -> &usize {
300        &self.soft_items
301    }
302
303    /// Borrow max_bytes.
304    #[inline]
305    pub const fn max_bytes(&self) -> &Option<usize> {
306        &self.max_bytes
307    }
308
309    /// Borrow soft_bytes.
310    #[inline]
311    pub const fn soft_bytes(&self) -> &Option<usize> {
312        &self.soft_bytes
313    }
314
315    /// Return `true` if the given occupancy is below soft watermarks.
316    pub fn below_soft(&self, items: usize, bytes: usize) -> bool {
317        let items_ok = items < self.soft_items;
318        let bytes_ok = match (self.max_bytes, self.soft_bytes) {
319            (Some(_), Some(soft)) => bytes < soft,
320            _ => true,
321        };
322        items_ok && bytes_ok
323    }
324
325    /// Return `true` if the occupancy is at or above hard cap.
326    pub fn at_or_above_hard(&self, items: usize, bytes: usize) -> bool {
327        if items >= self.max_items {
328            return true;
329        }
330        if let Some(maxb) = self.max_bytes {
331            if bytes >= maxb {
332                return true;
333            }
334        }
335        false
336    }
337}
338
339/// Watermark state derived from queue occupancy and caps.
340#[non_exhaustive]
341#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
342pub enum WatermarkState {
343    /// Well below soft watermarks.
344    BelowSoft,
345    /// Between soft and hard thresholds.
346    BetweenSoftAndHard,
347    /// At or above hard cap.
348    AtOrAboveHard,
349}
350
351/// Admission behavior when the queue is not clearly below soft caps.
352#[non_exhaustive]
353#[derive(Debug, Clone, Copy, PartialEq, Eq)]
354pub enum AdmissionPolicy {
355    /// Deadline-aware and QoS-aware admission between soft and hard caps.
356    DeadlineAndQoSAware,
357    /// Simple drop-newest under pressure.
358    DropNewest,
359    /// Simple drop-oldest under pressure.
360    DropOldest,
361    /// Block producer until space is available (P2 only).
362    Block,
363}
364
365/// Decision returned by an admission controller (`EdgePolicy::decide`); pure and side-effect free.
366#[non_exhaustive]
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub enum AdmissionDecision {
369    /// Accept the incoming item without evicting anything.
370    Admit,
371
372    /// Refuse the incoming item.
373    Reject,
374
375    /// Drop the incoming (newest) item; queue unchanged.
376    DropNewest,
377
378    /// Evict `n` oldest items, then admit (caller performs evictions).
379    /// `n` can be 1 for BetweenSoftAndHard with DropOldest.
380    Evict(usize),
381
382    /// Evict oldest items until the queue is below the hard watermark,
383    /// then admit if possible. Caller performs repeated evictions.
384    EvictUntilBelowHard,
385
386    /// Block the producer until space becomes available (edge decides how to block).
387    Block,
388}
389
390/// Per-edge policy bundle.
391///
392/// `caps` → soft/hard watermarks; `admission` → behavior between soft/hard;
393/// `over_budget` → action when capacity/budget constraints are breached.
394#[non_exhaustive]
395#[derive(Debug, Clone, Copy, PartialEq, Eq)]
396pub struct EdgePolicy {
397    /// Capacity and watermarks.
398    pub caps: QueueCaps,
399    /// Admission behavior between soft and hard thresholds.
400    pub admission: AdmissionPolicy,
401    /// Action to take on over-budget scenarios at this edge.
402    pub over_budget: OverBudgetAction,
403}
404
405impl EdgePolicy {
406    /// Construct a new `EdgePolicy`.
407    pub const fn new(
408        caps: QueueCaps,
409        admission: AdmissionPolicy,
410        over_budget: OverBudgetAction,
411    ) -> Self {
412        Self {
413            caps,
414            admission,
415            over_budget,
416        }
417    }
418
419    /// Borrow caps.
420    #[inline]
421    pub const fn caps(&self) -> &QueueCaps {
422        &self.caps
423    }
424
425    /// Borrow admission policy.
426    #[inline]
427    pub const fn admission(&self) -> &AdmissionPolicy {
428        &self.admission
429    }
430
431    /// Borrow over-budget action.
432    #[inline]
433    pub const fn over_budget(&self) -> &OverBudgetAction {
434        &self.over_budget
435    }
436
437    /// Compute a watermark state from occupancy stats.
438    pub fn watermark(&self, items: usize, bytes: usize) -> WatermarkState {
439        if self.caps.at_or_above_hard(items, bytes) {
440            WatermarkState::AtOrAboveHard
441        } else if self.caps.below_soft(items, bytes) {
442            WatermarkState::BelowSoft
443        } else {
444            WatermarkState::BetweenSoftAndHard
445        }
446    }
447
448    /// Apply admission logic based on header hints (deadline/qos) and occupancy.
449    ///
450    /// # Behaviour
451    ///
452    /// - `BelowSoft`  => `Admit`.
453    /// - `BetweenSoftAndHard`:
454    ///     - `DeadlineAndQoSAware` => consult deadline/qos to decide (TODO: full impl).
455    ///     - `DropNewest` => `DropNewest`.
456    ///     - `DropOldest` => `Evict(1)`.
457    ///     - `Block` => `Block`.
458    /// - `AtOrAboveHard`:
459    ///     - If the *single* incoming item cannot fit under the hard cap (even when
460    ///       the queue is empty), the item is `Reject`ed immediately.
461    ///     - Otherwise:
462    ///         - `DropNewest` => `DropNewest`.
463    ///         - `DropOldest` => `EvictUntilBelowHard`.
464    ///         - `DeadlineAndQoSAware` => conservative default `Reject` (or consult
465    ///           deadline/qos if implemented).
466    ///         - `Block` => `Block`.
467    ///
468    /// # Warning
469    ///
470    /// QoS-aware behaviour is **not yet implemented**.  The `DeadlineAndQoSAware`
471    /// branch currently uses conservative defaults:
472    /// - between soft/hard it defaults to `Admit`, and
473    /// - at-or-above-hard it defaults to `Reject`.
474    ///
475    /// Implementors should update this method to use deadline and QoS hints
476    /// when the scheduler rules are available.
477    pub fn decide(
478        &self,
479        items: usize,
480        bytes: usize,
481        item_bytes: usize,
482        _deadline: Option<DeadlineNs>,
483        _qos: QoSClass,
484    ) -> AdmissionDecision {
485        match self.watermark(items, bytes) {
486            WatermarkState::BelowSoft => AdmissionDecision::Admit,
487
488            WatermarkState::BetweenSoftAndHard => match self.admission {
489                AdmissionPolicy::DeadlineAndQoSAware => {
490                    // TODO: implement full deadline/qos logic. For now, admit.
491                    AdmissionDecision::Admit
492                }
493                AdmissionPolicy::DropNewest => AdmissionDecision::DropNewest,
494                AdmissionPolicy::DropOldest => AdmissionDecision::Evict(1),
495                AdmissionPolicy::Block => AdmissionDecision::Block,
496            },
497
498            WatermarkState::AtOrAboveHard => {
499                // If item alone cannot fit under hard caps, reject immediately.
500                if self.caps.at_or_above_hard(0, item_bytes) {
501                    return AdmissionDecision::Reject;
502                }
503
504                match self.admission {
505                    AdmissionPolicy::DeadlineAndQoSAware => {
506                        // Conservative default: reject at or above hard.
507                        // Or evaluate deadline/qos here and map to DropOldest/DropNewest.
508                        AdmissionDecision::Reject
509                    }
510                    AdmissionPolicy::DropNewest => AdmissionDecision::DropNewest,
511                    AdmissionPolicy::DropOldest => AdmissionDecision::EvictUntilBelowHard,
512                    AdmissionPolicy::Block => AdmissionDecision::Block,
513                }
514            }
515        }
516    }
517}
518
519/// Policy bundle attached to a node.
520///
521/// Used by schedulers:
522/// - `batching.fixed_n`/`max_delta_t` guide batch formation.
523/// - `budget.tick_budget` (soft) and `budget.watchdog_ticks` (hard) guide time budgeting.
524/// - `deadline.default_deadline_ns` allows EDF synthesis when inputs have no deadlines,
525///   `deadline.slack_tolerance_ns` provides grace, and `require_absolute_deadline` enforces strictness.
526#[non_exhaustive]
527#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
528pub struct NodePolicy {
529    /// Batch formation policy.
530    batching: BatchingPolicy,
531    /// Budget policy for execution steps.
532    budget: BudgetPolicy,
533    /// Deadline policy for inputs/outputs.
534    deadline: DeadlinePolicy,
535}
536
537impl NodePolicy {
538    /// Construct a `NodePolicy` explicitly.
539    pub const fn new(
540        batching: BatchingPolicy,
541        budget: BudgetPolicy,
542        deadline: DeadlinePolicy,
543    ) -> Self {
544        Self {
545            batching,
546            budget,
547            deadline,
548        }
549    }
550
551    /// Borrow the batching policy.
552    #[inline]
553    pub const fn batching(&self) -> &BatchingPolicy {
554        &self.batching
555    }
556
557    /// Borrow the budget policy.
558    #[inline]
559    pub const fn budget(&self) -> &BudgetPolicy {
560        &self.budget
561    }
562
563    /// Borrow the deadline policy.
564    #[inline]
565    pub const fn deadline(&self) -> &DeadlinePolicy {
566        &self.deadline
567    }
568}