1#![allow(unused_imports)]
2use crate::numeric::BackendNumericPolicy;
5
6const DEVICE_WORK_QUEUE_NUMERIC: BackendNumericPolicy =
7 BackendNumericPolicy::new("device work queue");
8
9#[derive(Clone, Copy, Debug, Eq, PartialEq)]
11pub enum WorkQueueHostSync {
12 FinalOnly,
14 HostParticipates,
16}
17
18#[derive(Clone, Copy, Debug, Eq, PartialEq)]
20pub struct DeviceWorkQueueProfile {
21 pub initial_items: u64,
23 pub queue_capacity: u64,
25 pub entry_bytes: u64,
27 pub control_bytes: u64,
29 pub budget_bytes: u64,
31 pub host_sync: WorkQueueHostSync,
33}
34
35#[derive(Clone, Copy, Debug, Eq, PartialEq)]
38pub struct DeviceWorkQueueExpansionProfile {
39 pub initial_items: u64,
41 pub expansion_items: u64,
44 pub entry_bytes: u64,
46 pub control_bytes: u64,
48 pub budget_bytes: u64,
50 pub host_sync: WorkQueueHostSync,
52}
53
54#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub struct DeviceWorkQueuePlan {
57 pub queue_bytes: u64,
59 pub control_bytes: u64,
61 pub resident_bytes: u64,
63 pub initial_occupancy_bps: u32,
65 pub final_only_host_sync: bool,
67}
68
69#[derive(Clone, Copy, Debug, Eq, PartialEq)]
71pub enum DeviceWorkQueueDrainStrategy {
72 SingleResidentDrain,
74 ChunkedResidentDrain,
77}
78
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
81pub struct DeviceWorkQueueBackpressurePlan {
82 pub queue: DeviceWorkQueuePlan,
84 pub strategy: DeviceWorkQueueDrainStrategy,
86 pub items_per_chunk: u64,
88 pub chunks: u64,
90 pub final_only_host_sync: bool,
92}
93
94#[derive(Clone, Debug, Eq, PartialEq)]
96pub enum DeviceWorkQueueError {
97 ZeroCapacity,
99 ZeroEntryBytes,
101 ZeroDrainChunk,
103 InitialItemsExceedCapacity {
105 initial_items: u64,
107 queue_capacity: u64,
109 },
110 HostParticipationRejected,
112 ByteCountOverflow {
114 field: &'static str,
116 },
117 OverBudget {
119 required_bytes: u64,
121 budget_bytes: u64,
123 },
124}
125
126impl std::fmt::Display for DeviceWorkQueueError {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 match self {
129 Self::ZeroCapacity => write!(
130 f,
131 "device work queue capacity is zero. Fix: size the resident queue before launch."
132 ),
133 Self::ZeroEntryBytes => write!(
134 f,
135 "device work queue entry_bytes is zero. Fix: pass the concrete queue-entry ABI width."
136 ),
137 Self::ZeroDrainChunk => write!(
138 f,
139 "device work queue drain chunk is zero. Fix: pass a non-zero device-side drain window."
140 ),
141 Self::InitialItemsExceedCapacity {
142 initial_items,
143 queue_capacity,
144 } => write!(
145 f,
146 "device work queue initial_items={initial_items} exceeds queue_capacity={queue_capacity}. Fix: shard initial frontier items or increase explicit queue capacity."
147 ),
148 Self::HostParticipationRejected => write!(
149 f,
150 "device work queue rejected host participation. Fix: use final-only completion readback so dependent dataflow stays device-side."
151 ),
152 Self::ByteCountOverflow { field } => write!(
153 f,
154 "device work queue overflowed while computing {field}. Fix: shard the dependent dataflow workload before queue planning."
155 ),
156 Self::OverBudget {
157 required_bytes,
158 budget_bytes,
159 } => write!(
160 f,
161 "device work queue requires {required_bytes} bytes but budget allows {budget_bytes}. Fix: reduce queue capacity, shard the graph, or raise the explicit device budget."
162 ),
163 }
164 }
165}
166
167impl std::error::Error for DeviceWorkQueueError {}
168
169fn checked_add(lhs: u64, rhs: u64, field: &'static str) -> Result<u64, DeviceWorkQueueError> {
170 lhs.checked_add(rhs)
171 .ok_or(DeviceWorkQueueError::ByteCountOverflow { field })
172}
173
174fn checked_mul(lhs: u64, rhs: u64, field: &'static str) -> Result<u64, DeviceWorkQueueError> {
175 lhs.checked_mul(rhs)
176 .ok_or(DeviceWorkQueueError::ByteCountOverflow { field })
177}
178
179pub fn plan_device_work_queue(
181 profile: DeviceWorkQueueProfile,
182) -> Result<DeviceWorkQueuePlan, DeviceWorkQueueError> {
183 if profile.queue_capacity == 0 {
184 return Err(DeviceWorkQueueError::ZeroCapacity);
185 }
186 if profile.entry_bytes == 0 {
187 return Err(DeviceWorkQueueError::ZeroEntryBytes);
188 }
189 if profile.initial_items > profile.queue_capacity {
190 return Err(DeviceWorkQueueError::InitialItemsExceedCapacity {
191 initial_items: profile.initial_items,
192 queue_capacity: profile.queue_capacity,
193 });
194 }
195 if profile.host_sync != WorkQueueHostSync::FinalOnly {
196 return Err(DeviceWorkQueueError::HostParticipationRejected);
197 }
198
199 let queue_bytes = checked_mul(profile.queue_capacity, profile.entry_bytes, "queue bytes")?;
200 let resident_bytes = checked_add(queue_bytes, profile.control_bytes, "resident bytes")?;
201 if resident_bytes > profile.budget_bytes {
202 return Err(DeviceWorkQueueError::OverBudget {
203 required_bytes: resident_bytes,
204 budget_bytes: profile.budget_bytes,
205 });
206 }
207 let initial_occupancy_bps = DEVICE_WORK_QUEUE_NUMERIC.ratio_basis_points_u64(
208 profile.initial_items,
209 profile.queue_capacity,
210 0,
211 "device work queue initial occupancy",
212 );
213
214 Ok(DeviceWorkQueuePlan {
215 queue_bytes,
216 control_bytes: profile.control_bytes,
217 resident_bytes,
218 initial_occupancy_bps,
219 final_only_host_sync: true,
220 })
221}
222
223pub fn plan_device_work_queue_with_expansion(
226 profile: DeviceWorkQueueExpansionProfile,
227) -> Result<DeviceWorkQueuePlan, DeviceWorkQueueError> {
228 let desired_capacity = checked_add(
229 profile.initial_items,
230 profile.expansion_items,
231 "queue expansion capacity",
232 )?;
233 if profile.entry_bytes == 0 {
234 return plan_device_work_queue(DeviceWorkQueueProfile {
235 initial_items: profile.initial_items,
236 queue_capacity: desired_capacity,
237 entry_bytes: profile.entry_bytes,
238 control_bytes: profile.control_bytes,
239 budget_bytes: profile.budget_bytes,
240 host_sync: profile.host_sync,
241 });
242 }
243 let budget_capacity =
244 profile.budget_bytes.saturating_sub(profile.control_bytes) / profile.entry_bytes;
245 let queue_capacity = desired_capacity
246 .min(budget_capacity)
247 .max(profile.initial_items);
248 plan_device_work_queue(DeviceWorkQueueProfile {
249 initial_items: profile.initial_items,
250 queue_capacity,
251 entry_bytes: profile.entry_bytes,
252 control_bytes: profile.control_bytes,
253 budget_bytes: profile.budget_bytes,
254 host_sync: profile.host_sync,
255 })
256}
257
258pub fn plan_device_work_queue_backpressure(
260 profile: DeviceWorkQueueProfile,
261 max_items_per_drain_launch: u64,
262) -> Result<DeviceWorkQueueBackpressurePlan, DeviceWorkQueueError> {
263 if max_items_per_drain_launch == 0 {
264 return Err(DeviceWorkQueueError::ZeroDrainChunk);
265 }
266 let queue = plan_device_work_queue(profile)?;
267 let chunks = div_ceil_u64(
268 profile.queue_capacity,
269 max_items_per_drain_launch,
270 "drain chunks",
271 )?;
272 let strategy = if chunks == 1 {
273 DeviceWorkQueueDrainStrategy::SingleResidentDrain
274 } else {
275 DeviceWorkQueueDrainStrategy::ChunkedResidentDrain
276 };
277 Ok(DeviceWorkQueueBackpressurePlan {
278 queue,
279 strategy,
280 items_per_chunk: max_items_per_drain_launch.min(profile.queue_capacity),
281 chunks,
282 final_only_host_sync: true,
283 })
284}
285
286fn div_ceil_u64(lhs: u64, rhs: u64, field: &'static str) -> Result<u64, DeviceWorkQueueError> {
287 DEVICE_WORK_QUEUE_NUMERIC
288 .checked_ceil_div_u64(lhs, rhs)
289 .ok_or(DeviceWorkQueueError::ByteCountOverflow { field })
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295
296 #[test]
297 fn device_work_queue_uses_shared_driver_numeric_policy() {
298 let source = include_str!("device_work_queue.rs");
299 let production = source
300 .split("#[cfg(test)]")
301 .next()
302 .expect("Fix: device work-queue source must contain production section");
303
304 assert!(source.contains("BackendNumericPolicy::new"));
305 assert!(source.contains("DEVICE_WORK_QUEUE_NUMERIC"));
306 assert!(source.contains("checked_ceil_div_u64"));
307 assert!(production.contains("fn checked_mul("));
308 assert!(production.contains("fn checked_add("));
309 assert!(!production.contains("CudaArithmeticOverflow"));
310 }
311
312 #[test]
313 fn device_work_queue_plans_final_only_resident_execution() {
314 let plan = plan_device_work_queue(DeviceWorkQueueProfile {
315 initial_items: 256,
316 queue_capacity: 1_024,
317 entry_bytes: 16,
318 control_bytes: 128,
319 budget_bytes: 32_768,
320 host_sync: WorkQueueHostSync::FinalOnly,
321 })
322 .expect("Fix: valid device work queue should plan");
323
324 assert_eq!(plan.queue_bytes, 16_384);
325 assert_eq!(plan.control_bytes, 128);
326 assert_eq!(plan.resident_bytes, 16_512);
327 assert_eq!(plan.initial_occupancy_bps, 2_500);
328 assert!(plan.final_only_host_sync);
329 }
330
331 #[test]
332 fn device_work_queue_expansion_uses_budgeted_resident_headroom() {
333 let plan = plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
334 initial_items: 4,
335 expansion_items: 12,
336 entry_bytes: 8,
337 control_bytes: 64,
338 budget_bytes: 256,
339 host_sync: WorkQueueHostSync::FinalOnly,
340 })
341 .expect("Fix: expansion headroom should fit inside the explicit queue budget");
342
343 assert_eq!(plan.queue_bytes, 128);
344 assert_eq!(plan.control_bytes, 64);
345 assert_eq!(plan.resident_bytes, 192);
346 assert_eq!(
347 plan.initial_occupancy_bps, 2_500,
348 "Fix: occupancy must use the expanded resident queue capacity"
349 );
350 assert!(plan.final_only_host_sync);
351 }
352
353 #[test]
354 fn device_work_queue_expansion_clamps_to_budget_without_dropping_initial_items() {
355 let plan = plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
356 initial_items: 4,
357 expansion_items: 100,
358 entry_bytes: 8,
359 control_bytes: 16,
360 budget_bytes: 96,
361 host_sync: WorkQueueHostSync::FinalOnly,
362 })
363 .expect("Fix: queue expansion should use all affordable headroom");
364
365 assert_eq!(plan.queue_bytes, 80);
366 assert_eq!(plan.resident_bytes, 96);
367 assert_eq!(
368 plan.initial_occupancy_bps, 4_000,
369 "Fix: initial occupancy should reflect budget-clamped expansion capacity"
370 );
371 }
372
373 #[test]
374 fn device_work_queue_expansion_fails_when_initial_frontier_cannot_fit() {
375 assert_eq!(
376 plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
377 initial_items: 8,
378 expansion_items: 100,
379 entry_bytes: 16,
380 control_bytes: 64,
381 budget_bytes: 128,
382 host_sync: WorkQueueHostSync::FinalOnly,
383 })
384 .expect_err("initial frontier must fail when it cannot fit the explicit budget"),
385 DeviceWorkQueueError::OverBudget {
386 required_bytes: 192,
387 budget_bytes: 128,
388 }
389 );
390 }
391
392 #[test]
393 fn device_work_queue_expansion_rejects_capacity_overflow() {
394 assert_eq!(
395 plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
396 initial_items: u64::MAX,
397 expansion_items: 1,
398 entry_bytes: 1,
399 control_bytes: 0,
400 budget_bytes: u64::MAX,
401 host_sync: WorkQueueHostSync::FinalOnly,
402 })
403 .expect_err("overflowed expansion capacity must fail before queue planning"),
404 DeviceWorkQueueError::ByteCountOverflow {
405 field: "queue expansion capacity",
406 }
407 );
408 }
409
410 #[test]
411 fn device_work_queue_rejects_host_participation() {
412 assert_eq!(
413 plan_device_work_queue(DeviceWorkQueueProfile {
414 initial_items: 1,
415 queue_capacity: 8,
416 entry_bytes: 16,
417 control_bytes: 64,
418 budget_bytes: 1_024,
419 host_sync: WorkQueueHostSync::HostParticipates,
420 })
421 .expect_err("host participation should fail"),
422 DeviceWorkQueueError::HostParticipationRejected
423 );
424 }
425
426 #[test]
427 fn device_work_queue_rejects_invalid_capacity_and_budget() {
428 assert_eq!(
429 plan_device_work_queue(DeviceWorkQueueProfile {
430 initial_items: 9,
431 queue_capacity: 8,
432 entry_bytes: 16,
433 control_bytes: 64,
434 budget_bytes: 1_024,
435 host_sync: WorkQueueHostSync::FinalOnly,
436 })
437 .expect_err("initial overflow should fail"),
438 DeviceWorkQueueError::InitialItemsExceedCapacity {
439 initial_items: 9,
440 queue_capacity: 8,
441 }
442 );
443 assert_eq!(
444 plan_device_work_queue(DeviceWorkQueueProfile {
445 initial_items: 1,
446 queue_capacity: 8,
447 entry_bytes: 16,
448 control_bytes: 64,
449 budget_bytes: 128,
450 host_sync: WorkQueueHostSync::FinalOnly,
451 })
452 .expect_err("over-budget queue should fail"),
453 DeviceWorkQueueError::OverBudget {
454 required_bytes: 192,
455 budget_bytes: 128,
456 }
457 );
458 }
459
460 #[test]
461 fn device_work_queue_occupancy_uses_widened_arithmetic_for_huge_queues() {
462 let plan = plan_device_work_queue(DeviceWorkQueueProfile {
463 initial_items: u64::MAX,
464 queue_capacity: u64::MAX,
465 entry_bytes: 1,
466 control_bytes: 0,
467 budget_bytes: u64::MAX,
468 host_sync: WorkQueueHostSync::FinalOnly,
469 })
470 .expect("Fix: max-sized byte queue should fit exactly");
471
472 assert_eq!(
473 plan.initial_occupancy_bps, 10_000,
474 "Fix: device work-queue occupancy must not use saturating u64 multiplication before division; full queues must report 10000 bps even near u64::MAX."
475 );
476 }
477
478 #[test]
479 fn device_work_queue_occupancy_uses_shared_numeric_helper() {
480 let source = include_str!("device_work_queue.rs");
481
482 assert!(
483 source.contains(concat!("DEVICE_WORK_QUEUE_NUMERIC.", "ratio_basis_points_u64")),
484 "Fix: device work-queue occupancy must use the shared driver numeric ratio helper instead of a backend-local basis-point formula."
485 );
486 }
487
488 #[test]
489 fn device_work_queue_backpressure_chunks_large_resident_queues_without_host_participation() {
490 let plan = plan_device_work_queue_backpressure(
491 DeviceWorkQueueProfile {
492 initial_items: 4_096,
493 queue_capacity: 65_536,
494 entry_bytes: 16,
495 control_bytes: 128,
496 budget_bytes: 2 << 20,
497 host_sync: WorkQueueHostSync::FinalOnly,
498 },
499 8_192,
500 )
501 .expect("Fix: large resident work queue should plan bounded device-side drain chunks");
502
503 assert_eq!(
504 plan.strategy,
505 DeviceWorkQueueDrainStrategy::ChunkedResidentDrain
506 );
507 assert_eq!(plan.items_per_chunk, 8_192);
508 assert_eq!(plan.chunks, 8);
509 assert_eq!(plan.queue.resident_bytes, 1_048_704);
510 assert!(plan.final_only_host_sync);
511 assert!(plan.queue.final_only_host_sync);
512 }
513
514 #[test]
515 fn device_work_queue_backpressure_ceil_division_handles_max_capacity() {
516 let plan = plan_device_work_queue_backpressure(
517 DeviceWorkQueueProfile {
518 initial_items: u64::MAX,
519 queue_capacity: u64::MAX,
520 entry_bytes: 1,
521 control_bytes: 0,
522 budget_bytes: u64::MAX,
523 host_sync: WorkQueueHostSync::FinalOnly,
524 },
525 65_536,
526 )
527 .expect("Fix: ceil division for max-capacity queues must not overflow");
528
529 assert_eq!(
530 plan.strategy,
531 DeviceWorkQueueDrainStrategy::ChunkedResidentDrain
532 );
533 assert_eq!(plan.queue.queue_bytes, u64::MAX);
534 assert_eq!(plan.items_per_chunk, 65_536);
535 assert_eq!(plan.chunks, 281_474_976_710_656);
536 assert!(plan.final_only_host_sync);
537 }
538
539 #[test]
540 fn device_work_queue_backpressure_rejects_zero_drain_chunk() {
541 let err = plan_device_work_queue_backpressure(
542 DeviceWorkQueueProfile {
543 initial_items: 1,
544 queue_capacity: 8,
545 entry_bytes: 16,
546 control_bytes: 64,
547 budget_bytes: 1_024,
548 host_sync: WorkQueueHostSync::FinalOnly,
549 },
550 0,
551 )
552 .expect_err("zero drain chunk must fail loudly");
553
554 assert_eq!(err, DeviceWorkQueueError::ZeroDrainChunk);
555 }
556
557 #[test]
558 fn generated_device_work_queue_profiles_preserve_budget_and_sync_contracts() {
559 let mut state = 0xa409_3822_299f_31d0_u64;
560 for case_index in 0..2048usize {
561 let queue_capacity = 1 + next_u64(&mut state) % 262_144;
562 let entry_bytes = 1 + next_u64(&mut state) % 256;
563 let initial_items = next_u64(&mut state) % (queue_capacity + 1);
564 let control_bytes = next_u64(&mut state) % 4096;
565 let queue_bytes = queue_capacity
566 .checked_mul(entry_bytes)
567 .expect("Fix: generated queue byte count should fit");
568 let resident_bytes = queue_bytes
569 .checked_add(control_bytes)
570 .expect("Fix: generated resident byte count should fit");
571 let budget_bytes = resident_bytes + (next_u64(&mut state) % 8192);
572 let profile = DeviceWorkQueueProfile {
573 initial_items,
574 queue_capacity,
575 entry_bytes,
576 control_bytes,
577 budget_bytes,
578 host_sync: WorkQueueHostSync::FinalOnly,
579 };
580
581 let plan = plan_device_work_queue(profile)
582 .expect("Fix: generated valid queue profile must plan");
583 assert_eq!(plan.queue_bytes, queue_bytes, "case {case_index}");
584 assert_eq!(plan.control_bytes, control_bytes, "case {case_index}");
585 assert_eq!(plan.resident_bytes, resident_bytes, "case {case_index}");
586 assert!(plan.resident_bytes <= budget_bytes, "case {case_index}");
587 assert!(plan.initial_occupancy_bps <= 10_000, "case {case_index}");
588 assert!(plan.final_only_host_sync, "case {case_index}");
589
590 let drain = 1 + next_u64(&mut state) % queue_capacity;
591 let backpressure = plan_device_work_queue_backpressure(profile, drain)
592 .expect("Fix: generated valid backpressure profile must plan");
593 assert_eq!(backpressure.queue, plan, "case {case_index}");
594 assert!(
595 backpressure.items_per_chunk <= queue_capacity,
596 "case {case_index}"
597 );
598 assert!(backpressure.chunks >= 1, "case {case_index}");
599 assert!(backpressure.final_only_host_sync, "case {case_index}");
600
601 let expansion_items = next_u64(&mut state) % queue_capacity;
602 let expansion_budget = resident_bytes + (expansion_items * entry_bytes);
603 let expansion =
604 plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
605 initial_items,
606 expansion_items,
607 entry_bytes,
608 control_bytes,
609 budget_bytes: expansion_budget,
610 host_sync: WorkQueueHostSync::FinalOnly,
611 })
612 .expect("Fix: generated valid expansion queue profile must plan");
613 assert!(
614 expansion.resident_bytes <= expansion_budget,
615 "case {case_index}"
616 );
617 assert!(
618 expansion.queue_bytes >= initial_items * entry_bytes,
619 "case {case_index}"
620 );
621 assert!(expansion.final_only_host_sync, "case {case_index}");
622 }
623 }
624
625 fn next_u64(state: &mut u64) -> u64 {
626 *state = state
627 .wrapping_mul(6_364_136_223_846_793_005)
628 .wrapping_add(1_442_695_040_888_963_407);
629 *state
630 }
631}