limen_core/policy.rs
1//! Policies for batching, budgets, deadlines, admission, and capacities.
2
3use crate::types::{DeadlineNs, QoSClass, Ticks};
4
5/// Configuration for a sliding window.
6///
7/// A sliding window produces overlapping (or non-overlapping) windows of `size` items,
8/// advancing by `stride` items per step.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub struct SlidingWindow {
11 /// Number of items the window start advances between consecutive windows.
12 stride: usize,
13}
14
15impl SlidingWindow {
16 /// Creates a new sliding window configuration.
17 ///
18 /// `stride` is how many items the window start advances between consecutive windows.
19 #[must_use]
20 pub fn new(stride: usize) -> Self {
21 Self { stride }
22 }
23
24 /// Returns how many items the window start advances between consecutive windows.
25 #[must_use]
26 pub fn stride(&self) -> &usize {
27 &self.stride
28 }
29}
30
31/// How batches are formed over an input stream.
32///
33/// The window policy controls how items from a stream are grouped into `Window`s.
34///
35/// - [`WindowKind::Disjoint`] (default): non-overlapping windows; each window consumes
36/// `size` items from the stream and the next window begins immediately after.
37/// - [`WindowKind::Sliding`]: fixed-size windows that advance by a fixed step. Each
38/// produced window contains `size` items, and the next window starts `stride` items
39/// after the previous window start. When `stride == size`, this is equivalent to
40/// [`WindowKind::Disjoint`].
41///
42/// For [`WindowKind::Sliding`], `stride <= size` is expected.
43#[non_exhaustive]
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WindowKind {
46 /// Non-overlapping (disjoint) windows; default.
47 Disjoint,
48 /// Sliding windows: each produced window has `size` items and windows advance
49 /// by `stride` items between steps.
50 Sliding(SlidingWindow),
51}
52
53/// Batch formation policy: fixed-N and/or Δt micro-batching.
54#[non_exhaustive]
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct BatchingPolicy {
57 /// Fixed number of items per batch (>= 1). If `None`, do not use fixed-N.
58 fixed_n: Option<usize>,
59
60 /// Maximum micro-batching window in ticks (Δt). If `None`, no Δt cap.
61 max_delta_t: Option<Ticks>,
62
63 /// Windowing style: disjoint (default) or sliding (opt-in).
64 window_kind: WindowKind,
65}
66
67impl BatchingPolicy {
68 /// No batching (batch size 1).
69 pub const fn none() -> Self {
70 Self {
71 fixed_n: Some(1),
72 max_delta_t: None,
73 window_kind: WindowKind::Disjoint,
74 }
75 }
76
77 /// Fixed-N batching.
78 pub const fn fixed(n: usize) -> Self {
79 Self {
80 fixed_n: Some(n),
81 max_delta_t: None,
82 window_kind: WindowKind::Disjoint,
83 }
84 }
85
86 /// Δt-bounded micro-batching (runtime may still emit batch < N if time cap hits).
87 pub const fn delta_t(cap: Ticks) -> Self {
88 Self {
89 fixed_n: None,
90 max_delta_t: Some(cap),
91 window_kind: WindowKind::Disjoint,
92 }
93 }
94
95 /// Combined fixed-N and Δt caps.
96 pub const fn fixed_and_delta_t(n: usize, cap: Ticks) -> Self {
97 Self {
98 fixed_n: Some(n),
99 max_delta_t: Some(cap),
100 window_kind: WindowKind::Disjoint,
101 }
102 }
103
104 /// Combined fixed-N and Δt caps with explicit window kind (e.g., sliding).
105 #[inline]
106 pub const fn with_window(
107 n: Option<usize>,
108 cap: Option<Ticks>,
109 window_kind: WindowKind,
110 ) -> Self {
111 Self {
112 fixed_n: n,
113 max_delta_t: cap,
114 window_kind,
115 }
116 }
117
118 /// Convenience: fixed-N with explicit window kind.
119 #[inline]
120 pub const fn fixed_with_window(n: usize, window_kind: WindowKind) -> Self {
121 Self {
122 fixed_n: Some(n),
123 max_delta_t: None,
124 window_kind,
125 }
126 }
127
128 /// Convenience: delta-t with explicit window kind.
129 #[inline]
130 pub const fn delta_t_with_window(cap: Ticks, window_kind: WindowKind) -> Self {
131 Self {
132 fixed_n: None,
133 max_delta_t: Some(cap),
134 window_kind,
135 }
136 }
137
138 /// Borrow the fixed-N batching value (if any).
139 #[inline]
140 pub const fn fixed_n(&self) -> &Option<usize> {
141 &self.fixed_n
142 }
143
144 /// Borrow the delta-t cap (if any).
145 #[inline]
146 pub const fn max_delta_t(&self) -> &Option<Ticks> {
147 &self.max_delta_t
148 }
149
150 /// Return the window kind.
151 #[inline]
152 pub const fn window_kind(&self) -> WindowKind {
153 self.window_kind
154 }
155}
156
157impl Default for BatchingPolicy {
158 fn default() -> Self {
159 BatchingPolicy::none()
160 }
161}
162
163/// Budget policy for node execution.
164#[non_exhaustive]
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
166pub struct BudgetPolicy {
167 /// Per-step tick budget; exceeding invokes over-budget action.
168 tick_budget: Option<Ticks>,
169 /// *Hard* guardrail for a single step. If exceeded, the runtime may call
170 /// `Node::on_watchdog_timeout` and/or apply `OverBudgetAction`.
171 ///
172 /// Keep in no_std: only requires a monotonic clock; no OS timers.
173 watchdog_ticks: Option<Ticks>,
174}
175
176impl BudgetPolicy {
177 /// Construct a new budget policy.
178 pub const fn new(tick_budget: Option<Ticks>, watchdog_ticks: Option<Ticks>) -> Self {
179 Self {
180 tick_budget,
181 watchdog_ticks,
182 }
183 }
184
185 /// Borrow the tick budget.
186 #[inline]
187 pub const fn tick_budget(&self) -> &Option<Ticks> {
188 &self.tick_budget
189 }
190
191 /// Borrow the watchdog ticks.
192 #[inline]
193 pub const fn watchdog_ticks(&self) -> &Option<Ticks> {
194 &self.watchdog_ticks
195 }
196}
197
198/// Deadline policy for messages processed by a node.
199#[non_exhaustive]
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
201pub struct DeadlinePolicy {
202 /// Whether to require absolute deadlines on inputs (P2).
203 require_absolute_deadline: bool,
204 /// Optional slack tolerance (ns) before defaulting/degrading.
205 slack_tolerance_ns: Option<DeadlineNs>,
206 /// Synthesize a deadline when inputs have none: absolute_deadline = now + value.
207 /// Leave `None` to avoid synthesizing (strict mode or non-EDF).
208 default_deadline_ns: Option<DeadlineNs>,
209}
210
211impl DeadlinePolicy {
212 /// Construct a deadline policy.
213 pub const fn new(
214 require_absolute_deadline: bool,
215 slack_tolerance_ns: Option<DeadlineNs>,
216 default_deadline_ns: Option<DeadlineNs>,
217 ) -> Self {
218 Self {
219 require_absolute_deadline,
220 slack_tolerance_ns,
221 default_deadline_ns,
222 }
223 }
224
225 /// Borrow the require_absolute_deadline bool.
226 #[inline]
227 pub const fn require_absolute_deadline(&self) -> bool {
228 self.require_absolute_deadline
229 }
230
231 /// Borrow the slack_tolerance_ns.
232 #[inline]
233 pub const fn slack_tolerance_ns(&self) -> &Option<DeadlineNs> {
234 &self.slack_tolerance_ns
235 }
236
237 /// Borrow the default_deadline_ns.
238 #[inline]
239 pub const fn default_deadline_ns(&self) -> &Option<DeadlineNs> {
240 &self.default_deadline_ns
241 }
242}
243
244/// Action to take when budgets or deadlines are breached.
245#[non_exhaustive]
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum OverBudgetAction {
248 /// Drop the message(s).
249 Drop,
250 /// Skip the stage and signal upstream (if applicable).
251 SkipStage,
252 /// Degrade to a faster path (e.g., quantized model).
253 Degrade,
254 /// Emit a default value.
255 DefaultOnTimeout,
256}
257
258/// Queue capacity and watermark configuration.
259///
260/// `soft_*` define backpressure **watermarks**; `max_*` define **hard caps**.
261/// Watermark state is derived from live occupancy snapshots and drives scheduling/backpressure.
262#[non_exhaustive]
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub struct QueueCaps {
265 /// Maximum number of items permitted (hard cap).
266 pub max_items: usize,
267 /// Soft watermark for items; admission adapts between soft and hard.
268 pub soft_items: usize,
269 /// Optional byte cap and soft watermark for payload sizes.
270 pub max_bytes: Option<usize>,
271 /// Optional soft watermark in bytes.
272 pub soft_bytes: Option<usize>,
273}
274
275impl QueueCaps {
276 /// Construct new queue caps.
277 pub const fn new(
278 max_items: usize,
279 soft_items: usize,
280 max_bytes: Option<usize>,
281 soft_bytes: Option<usize>,
282 ) -> Self {
283 Self {
284 max_items,
285 soft_items,
286 max_bytes,
287 soft_bytes,
288 }
289 }
290
291 /// Borrow max_items.
292 #[inline]
293 pub const fn max_items(&self) -> &usize {
294 &self.max_items
295 }
296
297 /// Borrow soft_items.
298 #[inline]
299 pub const fn soft_items(&self) -> &usize {
300 &self.soft_items
301 }
302
303 /// Borrow max_bytes.
304 #[inline]
305 pub const fn max_bytes(&self) -> &Option<usize> {
306 &self.max_bytes
307 }
308
309 /// Borrow soft_bytes.
310 #[inline]
311 pub const fn soft_bytes(&self) -> &Option<usize> {
312 &self.soft_bytes
313 }
314
315 /// Return `true` if the given occupancy is below soft watermarks.
316 pub fn below_soft(&self, items: usize, bytes: usize) -> bool {
317 let items_ok = items < self.soft_items;
318 let bytes_ok = match (self.max_bytes, self.soft_bytes) {
319 (Some(_), Some(soft)) => bytes < soft,
320 _ => true,
321 };
322 items_ok && bytes_ok
323 }
324
325 /// Return `true` if the occupancy is at or above hard cap.
326 pub fn at_or_above_hard(&self, items: usize, bytes: usize) -> bool {
327 if items >= self.max_items {
328 return true;
329 }
330 if let Some(maxb) = self.max_bytes {
331 if bytes >= maxb {
332 return true;
333 }
334 }
335 false
336 }
337}
338
339/// Watermark state derived from queue occupancy and caps.
340#[non_exhaustive]
341#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
342pub enum WatermarkState {
343 /// Well below soft watermarks.
344 BelowSoft,
345 /// Between soft and hard thresholds.
346 BetweenSoftAndHard,
347 /// At or above hard cap.
348 AtOrAboveHard,
349}
350
351/// Admission behavior when the queue is not clearly below soft caps.
352#[non_exhaustive]
353#[derive(Debug, Clone, Copy, PartialEq, Eq)]
354pub enum AdmissionPolicy {
355 /// Deadline-aware and QoS-aware admission between soft and hard caps.
356 DeadlineAndQoSAware,
357 /// Simple drop-newest under pressure.
358 DropNewest,
359 /// Simple drop-oldest under pressure.
360 DropOldest,
361 /// Block producer until space is available (P2 only).
362 Block,
363}
364
365/// Decision returned by an admission controller (`EdgePolicy::decide`); pure and side-effect free.
366#[non_exhaustive]
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub enum AdmissionDecision {
369 /// Accept the incoming item without evicting anything.
370 Admit,
371
372 /// Refuse the incoming item.
373 Reject,
374
375 /// Drop the incoming (newest) item; queue unchanged.
376 DropNewest,
377
378 /// Evict `n` oldest items, then admit (caller performs evictions).
379 /// `n` can be 1 for BetweenSoftAndHard with DropOldest.
380 Evict(usize),
381
382 /// Evict oldest items until the queue is below the hard watermark,
383 /// then admit if possible. Caller performs repeated evictions.
384 EvictUntilBelowHard,
385
386 /// Block the producer until space becomes available (edge decides how to block).
387 Block,
388}
389
390/// Per-edge policy bundle.
391///
392/// `caps` → soft/hard watermarks; `admission` → behavior between soft/hard;
393/// `over_budget` → action when capacity/budget constraints are breached.
394#[non_exhaustive]
395#[derive(Debug, Clone, Copy, PartialEq, Eq)]
396pub struct EdgePolicy {
397 /// Capacity and watermarks.
398 pub caps: QueueCaps,
399 /// Admission behavior between soft and hard thresholds.
400 pub admission: AdmissionPolicy,
401 /// Action to take on over-budget scenarios at this edge.
402 pub over_budget: OverBudgetAction,
403}
404
405impl EdgePolicy {
406 /// Construct a new `EdgePolicy`.
407 pub const fn new(
408 caps: QueueCaps,
409 admission: AdmissionPolicy,
410 over_budget: OverBudgetAction,
411 ) -> Self {
412 Self {
413 caps,
414 admission,
415 over_budget,
416 }
417 }
418
419 /// Borrow caps.
420 #[inline]
421 pub const fn caps(&self) -> &QueueCaps {
422 &self.caps
423 }
424
425 /// Borrow admission policy.
426 #[inline]
427 pub const fn admission(&self) -> &AdmissionPolicy {
428 &self.admission
429 }
430
431 /// Borrow over-budget action.
432 #[inline]
433 pub const fn over_budget(&self) -> &OverBudgetAction {
434 &self.over_budget
435 }
436
437 /// Compute a watermark state from occupancy stats.
438 pub fn watermark(&self, items: usize, bytes: usize) -> WatermarkState {
439 if self.caps.at_or_above_hard(items, bytes) {
440 WatermarkState::AtOrAboveHard
441 } else if self.caps.below_soft(items, bytes) {
442 WatermarkState::BelowSoft
443 } else {
444 WatermarkState::BetweenSoftAndHard
445 }
446 }
447
448 /// Apply admission logic based on header hints (deadline/qos) and occupancy.
449 ///
450 /// # Behaviour
451 ///
452 /// - `BelowSoft` => `Admit`.
453 /// - `BetweenSoftAndHard`:
454 /// - `DeadlineAndQoSAware` => consult deadline/qos to decide (TODO: full impl).
455 /// - `DropNewest` => `DropNewest`.
456 /// - `DropOldest` => `Evict(1)`.
457 /// - `Block` => `Block`.
458 /// - `AtOrAboveHard`:
459 /// - If the *single* incoming item cannot fit under the hard cap (even when
460 /// the queue is empty), the item is `Reject`ed immediately.
461 /// - Otherwise:
462 /// - `DropNewest` => `DropNewest`.
463 /// - `DropOldest` => `EvictUntilBelowHard`.
464 /// - `DeadlineAndQoSAware` => conservative default `Reject` (or consult
465 /// deadline/qos if implemented).
466 /// - `Block` => `Block`.
467 ///
468 /// # Warning
469 ///
470 /// QoS-aware behaviour is **not yet implemented**. The `DeadlineAndQoSAware`
471 /// branch currently uses conservative defaults:
472 /// - between soft/hard it defaults to `Admit`, and
473 /// - at-or-above-hard it defaults to `Reject`.
474 ///
475 /// Implementors should update this method to use deadline and QoS hints
476 /// when the scheduler rules are available.
477 pub fn decide(
478 &self,
479 items: usize,
480 bytes: usize,
481 item_bytes: usize,
482 _deadline: Option<DeadlineNs>,
483 _qos: QoSClass,
484 ) -> AdmissionDecision {
485 match self.watermark(items, bytes) {
486 WatermarkState::BelowSoft => AdmissionDecision::Admit,
487
488 WatermarkState::BetweenSoftAndHard => match self.admission {
489 AdmissionPolicy::DeadlineAndQoSAware => {
490 // TODO: implement full deadline/qos logic. For now, admit.
491 AdmissionDecision::Admit
492 }
493 AdmissionPolicy::DropNewest => AdmissionDecision::DropNewest,
494 AdmissionPolicy::DropOldest => AdmissionDecision::Evict(1),
495 AdmissionPolicy::Block => AdmissionDecision::Block,
496 },
497
498 WatermarkState::AtOrAboveHard => {
499 // If item alone cannot fit under hard caps, reject immediately.
500 if self.caps.at_or_above_hard(0, item_bytes) {
501 return AdmissionDecision::Reject;
502 }
503
504 match self.admission {
505 AdmissionPolicy::DeadlineAndQoSAware => {
506 // Conservative default: reject at or above hard.
507 // Or evaluate deadline/qos here and map to DropOldest/DropNewest.
508 AdmissionDecision::Reject
509 }
510 AdmissionPolicy::DropNewest => AdmissionDecision::DropNewest,
511 AdmissionPolicy::DropOldest => AdmissionDecision::EvictUntilBelowHard,
512 AdmissionPolicy::Block => AdmissionDecision::Block,
513 }
514 }
515 }
516 }
517}
518
519/// Policy bundle attached to a node.
520///
521/// Used by schedulers:
522/// - `batching.fixed_n`/`max_delta_t` guide batch formation.
523/// - `budget.tick_budget` (soft) and `budget.watchdog_ticks` (hard) guide time budgeting.
524/// - `deadline.default_deadline_ns` allows EDF synthesis when inputs have no deadlines,
525/// `deadline.slack_tolerance_ns` provides grace, and `require_absolute_deadline` enforces strictness.
526#[non_exhaustive]
527#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
528pub struct NodePolicy {
529 /// Batch formation policy.
530 batching: BatchingPolicy,
531 /// Budget policy for execution steps.
532 budget: BudgetPolicy,
533 /// Deadline policy for inputs/outputs.
534 deadline: DeadlinePolicy,
535}
536
537impl NodePolicy {
538 /// Construct a `NodePolicy` explicitly.
539 pub const fn new(
540 batching: BatchingPolicy,
541 budget: BudgetPolicy,
542 deadline: DeadlinePolicy,
543 ) -> Self {
544 Self {
545 batching,
546 budget,
547 deadline,
548 }
549 }
550
551 /// Borrow the batching policy.
552 #[inline]
553 pub const fn batching(&self) -> &BatchingPolicy {
554 &self.batching
555 }
556
557 /// Borrow the budget policy.
558 #[inline]
559 pub const fn budget(&self) -> &BudgetPolicy {
560 &self.budget
561 }
562
563 /// Borrow the deadline policy.
564 #[inline]
565 pub const fn deadline(&self) -> &DeadlinePolicy {
566 &self.deadline
567 }
568}