laminar_core/budget/
task_budget.rs1use std::time::Instant;
4
5use super::stats::BudgetMetrics;
6
7#[derive(Debug)]
34pub struct TaskBudget {
35 start: Instant,
37 budget_ns: u64,
39 name: &'static str,
41 ring: u8,
43 record_metrics: bool,
45}
46
47impl TaskBudget {
48 pub const RING0_EVENT_NS: u64 = 500;
52
53 pub const RING0_BATCH_NS: u64 = 5_000;
55
56 pub const RING0_LOOKUP_NS: u64 = 200;
58
59 pub const RING0_WINDOW_NS: u64 = 10_000;
61
62 pub const RING0_ITERATION_NS: u64 = 10_000;
64
65 pub const RING1_CHUNK_NS: u64 = 1_000_000;
69
70 pub const RING1_CHECKPOINT_NS: u64 = 10_000_000;
72
73 pub const RING1_WAL_FLUSH_NS: u64 = 100_000;
75
76 pub const RING1_COMPACTION_NS: u64 = 5_000_000;
78
79 #[inline]
85 #[must_use]
86 pub fn ring0_event() -> Self {
87 Self {
88 start: Instant::now(),
89 budget_ns: Self::RING0_EVENT_NS,
90 name: "ring0_event",
91 ring: 0,
92 record_metrics: true,
93 }
94 }
95
96 #[inline]
101 #[must_use]
102 pub fn ring0_event_untracked() -> Self {
103 Self {
104 start: Instant::now(),
105 budget_ns: Self::RING0_EVENT_NS,
106 name: "ring0_event",
107 ring: 0,
108 record_metrics: false,
109 }
110 }
111
112 #[inline]
116 #[must_use]
117 pub fn ring0_batch() -> Self {
118 Self {
119 start: Instant::now(),
120 budget_ns: Self::RING0_BATCH_NS,
121 name: "ring0_batch",
122 ring: 0,
123 record_metrics: true,
124 }
125 }
126
127 #[inline]
131 #[must_use]
132 pub fn ring0_lookup() -> Self {
133 Self {
134 start: Instant::now(),
135 budget_ns: Self::RING0_LOOKUP_NS,
136 name: "ring0_lookup",
137 ring: 0,
138 record_metrics: true,
139 }
140 }
141
142 #[inline]
146 #[must_use]
147 pub fn ring0_window() -> Self {
148 Self {
149 start: Instant::now(),
150 budget_ns: Self::RING0_WINDOW_NS,
151 name: "ring0_window",
152 ring: 0,
153 record_metrics: true,
154 }
155 }
156
157 #[inline]
161 #[must_use]
162 pub fn ring0_iteration() -> Self {
163 Self {
164 start: Instant::now(),
165 budget_ns: Self::RING0_ITERATION_NS,
166 name: "ring0_iteration",
167 ring: 0,
168 record_metrics: true,
169 }
170 }
171
172 #[inline]
178 #[must_use]
179 pub fn ring1_chunk() -> Self {
180 Self {
181 start: Instant::now(),
182 budget_ns: Self::RING1_CHUNK_NS,
183 name: "ring1_chunk",
184 ring: 1,
185 record_metrics: true,
186 }
187 }
188
189 #[inline]
193 #[must_use]
194 pub fn ring1_checkpoint() -> Self {
195 Self {
196 start: Instant::now(),
197 budget_ns: Self::RING1_CHECKPOINT_NS,
198 name: "ring1_checkpoint",
199 ring: 1,
200 record_metrics: true,
201 }
202 }
203
204 #[inline]
208 #[must_use]
209 pub fn ring1_wal_flush() -> Self {
210 Self {
211 start: Instant::now(),
212 budget_ns: Self::RING1_WAL_FLUSH_NS,
213 name: "ring1_wal_flush",
214 ring: 1,
215 record_metrics: true,
216 }
217 }
218
219 #[inline]
223 #[must_use]
224 pub fn ring1_compaction() -> Self {
225 Self {
226 start: Instant::now(),
227 budget_ns: Self::RING1_COMPACTION_NS,
228 name: "ring1_compaction",
229 ring: 1,
230 record_metrics: true,
231 }
232 }
233
234 #[inline]
244 #[must_use]
245 pub fn custom(name: &'static str, ring: u8, budget_ns: u64) -> Self {
246 Self {
247 start: Instant::now(),
248 budget_ns,
249 name,
250 ring,
251 record_metrics: true,
252 }
253 }
254
255 #[inline]
257 #[must_use]
258 pub fn custom_untracked(name: &'static str, ring: u8, budget_ns: u64) -> Self {
259 Self {
260 start: Instant::now(),
261 budget_ns,
262 name,
263 ring,
264 record_metrics: false,
265 }
266 }
267
268 #[inline]
272 #[must_use]
273 pub fn name(&self) -> &'static str {
274 self.name
275 }
276
277 #[inline]
279 #[must_use]
280 pub fn ring(&self) -> u8 {
281 self.ring
282 }
283
284 #[inline]
286 #[must_use]
287 pub fn budget_ns(&self) -> u64 {
288 self.budget_ns
289 }
290
291 #[inline]
299 #[must_use]
300 #[allow(clippy::cast_possible_truncation)]
301 pub fn elapsed_ns(&self) -> u64 {
302 self.start.elapsed().as_nanos() as u64
303 }
304
305 #[inline]
311 #[must_use]
312 #[allow(clippy::cast_possible_wrap)]
313 pub fn remaining_ns(&self) -> i64 {
314 self.budget_ns as i64 - self.elapsed_ns() as i64
318 }
319
320 #[inline]
322 #[must_use]
323 pub fn exceeded(&self) -> bool {
324 self.elapsed_ns() > self.budget_ns
325 }
326
327 #[inline]
331 #[must_use]
332 pub fn almost_exceeded(&self) -> bool {
333 self.elapsed_ns() > (self.budget_ns * 8) / 10
334 }
335
336 #[inline]
340 #[must_use]
341 pub fn half_used(&self) -> bool {
342 self.elapsed_ns() > self.budget_ns / 2
343 }
344
345 #[inline]
349 #[must_use]
350 pub fn percentage_used(&self) -> u64 {
351 let elapsed = self.elapsed_ns();
352 if self.budget_ns == 0 {
353 return 100;
354 }
355 (elapsed * 100) / self.budget_ns
356 }
357}
358
359impl Drop for TaskBudget {
360 fn drop(&mut self) {
361 if self.record_metrics {
362 let elapsed = self.elapsed_ns();
363 BudgetMetrics::global().record_task(self.name, self.ring, self.budget_ns, elapsed);
364 }
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use std::thread;
372 use std::time::Duration;
373
374 #[test]
375 fn test_elapsed_increases() {
376 let budget = TaskBudget::ring1_chunk();
377 let t1 = budget.elapsed_ns();
378 thread::sleep(Duration::from_micros(100));
379 let t2 = budget.elapsed_ns();
380 assert!(t2 > t1);
381 }
382
383 #[test]
384 fn test_remaining_decreases() {
385 let budget = TaskBudget::ring1_chunk();
386 let r1 = budget.remaining_ns();
387 thread::sleep(Duration::from_micros(100));
388 let r2 = budget.remaining_ns();
389 assert!(r2 < r1);
390 }
391
392 #[test]
393 fn test_percentage_used() {
394 let budget = TaskBudget::custom("test", 0, 100_000); let pct = budget.percentage_used();
398 assert!(pct < 50, "Early percentage {pct} should be low");
399 }
400
401 #[test]
402 fn test_half_used() {
403 let budget = TaskBudget::custom("test", 0, 100_000); assert!(!budget.half_used());
407
408 thread::sleep(Duration::from_micros(60));
410
411 assert!(budget.half_used());
413 }
414
415 #[test]
416 fn test_untracked_budget() {
417 let budget = TaskBudget::ring0_event_untracked();
418 assert!(!budget.record_metrics);
419
420 let budget2 = TaskBudget::custom_untracked("test", 0, 1000);
421 assert!(!budget2.record_metrics);
422 }
423
424 #[test]
425 fn test_ring0_iteration() {
426 let budget = TaskBudget::ring0_iteration();
427 assert_eq!(budget.name(), "ring0_iteration");
428 assert_eq!(budget.budget_ns(), TaskBudget::RING0_ITERATION_NS);
429 }
430
431 #[test]
432 fn test_ring1_compaction() {
433 let budget = TaskBudget::ring1_compaction();
434 assert_eq!(budget.name(), "ring1_compaction");
435 assert_eq!(budget.budget_ns(), TaskBudget::RING1_COMPACTION_NS);
436 }
437}