Skip to main content

vyre_driver/
device_work_queue.rs

1#![allow(unused_imports)]
2//! Backend-neutral device-side work queue planning for dependent dataflow execution.
3
4use crate::numeric::BackendNumericPolicy;
5
6const DEVICE_WORK_QUEUE_NUMERIC: BackendNumericPolicy =
7    BackendNumericPolicy::new("device work queue");
8
9/// Host synchronization policy for a device device-side work queue.
10#[derive(Clone, Copy, Debug, Eq, PartialEq)]
11pub enum WorkQueueHostSync {
12    /// Host reads only final completion state after device-side draining.
13    FinalOnly,
14    /// Host participates during queue draining.
15    HostParticipates,
16}
17
18/// Work queue workload profile.
19#[derive(Clone, Copy, Debug, Eq, PartialEq)]
20pub struct DeviceWorkQueueProfile {
21    /// Initial active work items enqueued before launch.
22    pub initial_items: u64,
23    /// Maximum resident queue capacity in work items.
24    pub queue_capacity: u64,
25    /// ABI bytes per queue entry.
26    pub entry_bytes: u64,
27    /// Bytes required for queue head/tail counters and changed flags.
28    pub control_bytes: u64,
29    /// Caller-approved device-memory budget.
30    pub budget_bytes: u64,
31    /// Host synchronization policy.
32    pub host_sync: WorkQueueHostSync,
33}
34
35/// Work queue profile where a resident queue should reserve device-side
36/// expansion headroom in addition to the initial frontier.
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
38pub struct DeviceWorkQueueExpansionProfile {
39    /// Initial active work items enqueued before launch.
40    pub initial_items: u64,
41    /// Additional device-produced work items the queue should absorb when the
42    /// explicit queue budget leaves enough room.
43    pub expansion_items: u64,
44    /// ABI bytes per queue entry.
45    pub entry_bytes: u64,
46    /// Bytes required for queue head/tail counters and changed flags.
47    pub control_bytes: u64,
48    /// Caller-approved device-memory budget for the resident queue.
49    pub budget_bytes: u64,
50    /// Host synchronization policy.
51    pub host_sync: WorkQueueHostSync,
52}
53
54/// Device-side work queue execution plan.
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub struct DeviceWorkQueuePlan {
57    /// Resident queue bytes.
58    pub queue_bytes: u64,
59    /// Resident control bytes.
60    pub control_bytes: u64,
61    /// Total resident bytes.
62    pub resident_bytes: u64,
63    /// Queue occupancy in basis points before device-side expansion.
64    pub initial_occupancy_bps: u32,
65    /// Whether the plan guarantees final-state-only host synchronization.
66    pub final_only_host_sync: bool,
67}
68
69/// Device-side work queue drain strategy.
70#[derive(Clone, Copy, Debug, Eq, PartialEq)]
71pub enum DeviceWorkQueueDrainStrategy {
72    /// One resident drain window covers the whole queue.
73    SingleResidentDrain,
74    /// Queue capacity is split into multiple resident drain windows to bound
75    /// per-launch queue pressure without host participation.
76    ChunkedResidentDrain,
77}
78
79/// Device-side work queue plan with bounded resident drain windows.
80#[derive(Clone, Copy, Debug, Eq, PartialEq)]
81pub struct DeviceWorkQueueBackpressurePlan {
82    /// Base resident queue byte plan.
83    pub queue: DeviceWorkQueuePlan,
84    /// Selected resident drain strategy.
85    pub strategy: DeviceWorkQueueDrainStrategy,
86    /// Maximum queue entries drained by one device-side window.
87    pub items_per_chunk: u64,
88    /// Number of resident drain windows required to cover queue capacity.
89    pub chunks: u64,
90    /// Whether the backpressure plan preserves final-state-only host sync.
91    pub final_only_host_sync: bool,
92}
93
94/// Device work queue planning errors.
95#[derive(Clone, Debug, Eq, PartialEq)]
96pub enum DeviceWorkQueueError {
97    /// Queue capacity must be non-zero.
98    ZeroCapacity,
99    /// Entry ABI width must be explicit and non-zero.
100    ZeroEntryBytes,
101    /// Device-side drain chunk size must be non-zero.
102    ZeroDrainChunk,
103    /// Initial queue contents exceed capacity.
104    InitialItemsExceedCapacity {
105        /// Initial active items.
106        initial_items: u64,
107        /// Queue capacity.
108        queue_capacity: u64,
109    },
110    /// Host participation would reintroduce CPU orchestration.
111    HostParticipationRejected,
112    /// Byte arithmetic overflowed.
113    ByteCountOverflow {
114        /// Field being computed.
115        field: &'static str,
116    },
117    /// Queue does not fit the explicit device budget.
118    OverBudget {
119        /// Required bytes.
120        required_bytes: u64,
121        /// Budget bytes.
122        budget_bytes: u64,
123    },
124}
125
126impl std::fmt::Display for DeviceWorkQueueError {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        match self {
129            Self::ZeroCapacity => write!(
130                f,
131                "device work queue capacity is zero. Fix: size the resident queue before launch."
132            ),
133            Self::ZeroEntryBytes => write!(
134                f,
135                "device work queue entry_bytes is zero. Fix: pass the concrete queue-entry ABI width."
136            ),
137            Self::ZeroDrainChunk => write!(
138                f,
139                "device work queue drain chunk is zero. Fix: pass a non-zero device-side drain window."
140            ),
141            Self::InitialItemsExceedCapacity {
142                initial_items,
143                queue_capacity,
144            } => write!(
145                f,
146                "device work queue initial_items={initial_items} exceeds queue_capacity={queue_capacity}. Fix: shard initial frontier items or increase explicit queue capacity."
147            ),
148            Self::HostParticipationRejected => write!(
149                f,
150                "device work queue rejected host participation. Fix: use final-only completion readback so dependent dataflow stays device-side."
151            ),
152            Self::ByteCountOverflow { field } => write!(
153                f,
154                "device work queue overflowed while computing {field}. Fix: shard the dependent dataflow workload before queue planning."
155            ),
156            Self::OverBudget {
157                required_bytes,
158                budget_bytes,
159            } => write!(
160                f,
161                "device work queue requires {required_bytes} bytes but budget allows {budget_bytes}. Fix: reduce queue capacity, shard the graph, or raise the explicit device budget."
162            ),
163        }
164    }
165}
166
167impl std::error::Error for DeviceWorkQueueError {}
168
169fn checked_add(lhs: u64, rhs: u64, field: &'static str) -> Result<u64, DeviceWorkQueueError> {
170    lhs.checked_add(rhs)
171        .ok_or(DeviceWorkQueueError::ByteCountOverflow { field })
172}
173
174fn checked_mul(lhs: u64, rhs: u64, field: &'static str) -> Result<u64, DeviceWorkQueueError> {
175    lhs.checked_mul(rhs)
176        .ok_or(DeviceWorkQueueError::ByteCountOverflow { field })
177}
178
179/// Plan a device-resident work queue for dependent dataflow execution.
180pub fn plan_device_work_queue(
181    profile: DeviceWorkQueueProfile,
182) -> Result<DeviceWorkQueuePlan, DeviceWorkQueueError> {
183    if profile.queue_capacity == 0 {
184        return Err(DeviceWorkQueueError::ZeroCapacity);
185    }
186    if profile.entry_bytes == 0 {
187        return Err(DeviceWorkQueueError::ZeroEntryBytes);
188    }
189    if profile.initial_items > profile.queue_capacity {
190        return Err(DeviceWorkQueueError::InitialItemsExceedCapacity {
191            initial_items: profile.initial_items,
192            queue_capacity: profile.queue_capacity,
193        });
194    }
195    if profile.host_sync != WorkQueueHostSync::FinalOnly {
196        return Err(DeviceWorkQueueError::HostParticipationRejected);
197    }
198
199    let queue_bytes = checked_mul(profile.queue_capacity, profile.entry_bytes, "queue bytes")?;
200    let resident_bytes = checked_add(queue_bytes, profile.control_bytes, "resident bytes")?;
201    if resident_bytes > profile.budget_bytes {
202        return Err(DeviceWorkQueueError::OverBudget {
203            required_bytes: resident_bytes,
204            budget_bytes: profile.budget_bytes,
205        });
206    }
207    let initial_occupancy_bps = DEVICE_WORK_QUEUE_NUMERIC.ratio_basis_points_u64(
208        profile.initial_items,
209        profile.queue_capacity,
210        0,
211        "device work queue initial occupancy",
212    );
213
214    Ok(DeviceWorkQueuePlan {
215        queue_bytes,
216        control_bytes: profile.control_bytes,
217        resident_bytes,
218        initial_occupancy_bps,
219        final_only_host_sync: true,
220    })
221}
222
223/// Plan a device-resident work queue that preserves initial-frontier capacity
224/// and uses remaining queue budget for device-side expansion headroom.
225pub fn plan_device_work_queue_with_expansion(
226    profile: DeviceWorkQueueExpansionProfile,
227) -> Result<DeviceWorkQueuePlan, DeviceWorkQueueError> {
228    let desired_capacity = checked_add(
229        profile.initial_items,
230        profile.expansion_items,
231        "queue expansion capacity",
232    )?;
233    if profile.entry_bytes == 0 {
234        return plan_device_work_queue(DeviceWorkQueueProfile {
235            initial_items: profile.initial_items,
236            queue_capacity: desired_capacity,
237            entry_bytes: profile.entry_bytes,
238            control_bytes: profile.control_bytes,
239            budget_bytes: profile.budget_bytes,
240            host_sync: profile.host_sync,
241        });
242    }
243    let budget_capacity =
244        profile.budget_bytes.saturating_sub(profile.control_bytes) / profile.entry_bytes;
245    let queue_capacity = desired_capacity
246        .min(budget_capacity)
247        .max(profile.initial_items);
248    plan_device_work_queue(DeviceWorkQueueProfile {
249        initial_items: profile.initial_items,
250        queue_capacity,
251        entry_bytes: profile.entry_bytes,
252        control_bytes: profile.control_bytes,
253        budget_bytes: profile.budget_bytes,
254        host_sync: profile.host_sync,
255    })
256}
257
258/// Plan a device-resident work queue plus bounded device-side drain windows.
259pub fn plan_device_work_queue_backpressure(
260    profile: DeviceWorkQueueProfile,
261    max_items_per_drain_launch: u64,
262) -> Result<DeviceWorkQueueBackpressurePlan, DeviceWorkQueueError> {
263    if max_items_per_drain_launch == 0 {
264        return Err(DeviceWorkQueueError::ZeroDrainChunk);
265    }
266    let queue = plan_device_work_queue(profile)?;
267    let chunks = div_ceil_u64(
268        profile.queue_capacity,
269        max_items_per_drain_launch,
270        "drain chunks",
271    )?;
272    let strategy = if chunks == 1 {
273        DeviceWorkQueueDrainStrategy::SingleResidentDrain
274    } else {
275        DeviceWorkQueueDrainStrategy::ChunkedResidentDrain
276    };
277    Ok(DeviceWorkQueueBackpressurePlan {
278        queue,
279        strategy,
280        items_per_chunk: max_items_per_drain_launch.min(profile.queue_capacity),
281        chunks,
282        final_only_host_sync: true,
283    })
284}
285
286fn div_ceil_u64(lhs: u64, rhs: u64, field: &'static str) -> Result<u64, DeviceWorkQueueError> {
287    DEVICE_WORK_QUEUE_NUMERIC
288        .checked_ceil_div_u64(lhs, rhs)
289        .ok_or(DeviceWorkQueueError::ByteCountOverflow { field })
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    #[test]
297    fn device_work_queue_uses_shared_driver_numeric_policy() {
298        let source = include_str!("device_work_queue.rs");
299        let production = source
300            .split("#[cfg(test)]")
301            .next()
302            .expect("Fix: device work-queue source must contain production section");
303
304        assert!(source.contains("BackendNumericPolicy::new"));
305        assert!(source.contains("DEVICE_WORK_QUEUE_NUMERIC"));
306        assert!(source.contains("checked_ceil_div_u64"));
307        assert!(production.contains("fn checked_mul("));
308        assert!(production.contains("fn checked_add("));
309        assert!(!production.contains("CudaArithmeticOverflow"));
310    }
311
312    #[test]
313    fn device_work_queue_plans_final_only_resident_execution() {
314        let plan = plan_device_work_queue(DeviceWorkQueueProfile {
315            initial_items: 256,
316            queue_capacity: 1_024,
317            entry_bytes: 16,
318            control_bytes: 128,
319            budget_bytes: 32_768,
320            host_sync: WorkQueueHostSync::FinalOnly,
321        })
322        .expect("Fix: valid device work queue should plan");
323
324        assert_eq!(plan.queue_bytes, 16_384);
325        assert_eq!(plan.control_bytes, 128);
326        assert_eq!(plan.resident_bytes, 16_512);
327        assert_eq!(plan.initial_occupancy_bps, 2_500);
328        assert!(plan.final_only_host_sync);
329    }
330
331    #[test]
332    fn device_work_queue_expansion_uses_budgeted_resident_headroom() {
333        let plan = plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
334            initial_items: 4,
335            expansion_items: 12,
336            entry_bytes: 8,
337            control_bytes: 64,
338            budget_bytes: 256,
339            host_sync: WorkQueueHostSync::FinalOnly,
340        })
341        .expect("Fix: expansion headroom should fit inside the explicit queue budget");
342
343        assert_eq!(plan.queue_bytes, 128);
344        assert_eq!(plan.control_bytes, 64);
345        assert_eq!(plan.resident_bytes, 192);
346        assert_eq!(
347            plan.initial_occupancy_bps, 2_500,
348            "Fix: occupancy must use the expanded resident queue capacity"
349        );
350        assert!(plan.final_only_host_sync);
351    }
352
353    #[test]
354    fn device_work_queue_expansion_clamps_to_budget_without_dropping_initial_items() {
355        let plan = plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
356            initial_items: 4,
357            expansion_items: 100,
358            entry_bytes: 8,
359            control_bytes: 16,
360            budget_bytes: 96,
361            host_sync: WorkQueueHostSync::FinalOnly,
362        })
363        .expect("Fix: queue expansion should use all affordable headroom");
364
365        assert_eq!(plan.queue_bytes, 80);
366        assert_eq!(plan.resident_bytes, 96);
367        assert_eq!(
368            plan.initial_occupancy_bps, 4_000,
369            "Fix: initial occupancy should reflect budget-clamped expansion capacity"
370        );
371    }
372
373    #[test]
374    fn device_work_queue_expansion_fails_when_initial_frontier_cannot_fit() {
375        assert_eq!(
376            plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
377                initial_items: 8,
378                expansion_items: 100,
379                entry_bytes: 16,
380                control_bytes: 64,
381                budget_bytes: 128,
382                host_sync: WorkQueueHostSync::FinalOnly,
383            })
384            .expect_err("initial frontier must fail when it cannot fit the explicit budget"),
385            DeviceWorkQueueError::OverBudget {
386                required_bytes: 192,
387                budget_bytes: 128,
388            }
389        );
390    }
391
392    #[test]
393    fn device_work_queue_expansion_rejects_capacity_overflow() {
394        assert_eq!(
395            plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
396                initial_items: u64::MAX,
397                expansion_items: 1,
398                entry_bytes: 1,
399                control_bytes: 0,
400                budget_bytes: u64::MAX,
401                host_sync: WorkQueueHostSync::FinalOnly,
402            })
403            .expect_err("overflowed expansion capacity must fail before queue planning"),
404            DeviceWorkQueueError::ByteCountOverflow {
405                field: "queue expansion capacity",
406            }
407        );
408    }
409
410    #[test]
411    fn device_work_queue_rejects_host_participation() {
412        assert_eq!(
413            plan_device_work_queue(DeviceWorkQueueProfile {
414                initial_items: 1,
415                queue_capacity: 8,
416                entry_bytes: 16,
417                control_bytes: 64,
418                budget_bytes: 1_024,
419                host_sync: WorkQueueHostSync::HostParticipates,
420            })
421            .expect_err("host participation should fail"),
422            DeviceWorkQueueError::HostParticipationRejected
423        );
424    }
425
426    #[test]
427    fn device_work_queue_rejects_invalid_capacity_and_budget() {
428        assert_eq!(
429            plan_device_work_queue(DeviceWorkQueueProfile {
430                initial_items: 9,
431                queue_capacity: 8,
432                entry_bytes: 16,
433                control_bytes: 64,
434                budget_bytes: 1_024,
435                host_sync: WorkQueueHostSync::FinalOnly,
436            })
437            .expect_err("initial overflow should fail"),
438            DeviceWorkQueueError::InitialItemsExceedCapacity {
439                initial_items: 9,
440                queue_capacity: 8,
441            }
442        );
443        assert_eq!(
444            plan_device_work_queue(DeviceWorkQueueProfile {
445                initial_items: 1,
446                queue_capacity: 8,
447                entry_bytes: 16,
448                control_bytes: 64,
449                budget_bytes: 128,
450                host_sync: WorkQueueHostSync::FinalOnly,
451            })
452            .expect_err("over-budget queue should fail"),
453            DeviceWorkQueueError::OverBudget {
454                required_bytes: 192,
455                budget_bytes: 128,
456            }
457        );
458    }
459
460    #[test]
461    fn device_work_queue_occupancy_uses_widened_arithmetic_for_huge_queues() {
462        let plan = plan_device_work_queue(DeviceWorkQueueProfile {
463            initial_items: u64::MAX,
464            queue_capacity: u64::MAX,
465            entry_bytes: 1,
466            control_bytes: 0,
467            budget_bytes: u64::MAX,
468            host_sync: WorkQueueHostSync::FinalOnly,
469        })
470        .expect("Fix: max-sized byte queue should fit exactly");
471
472        assert_eq!(
473            plan.initial_occupancy_bps, 10_000,
474            "Fix: device work-queue occupancy must not use saturating u64 multiplication before division; full queues must report 10000 bps even near u64::MAX."
475        );
476    }
477
478    #[test]
479    fn device_work_queue_occupancy_uses_shared_numeric_helper() {
480        let source = include_str!("device_work_queue.rs");
481
482        assert!(
483            source.contains(concat!("DEVICE_WORK_QUEUE_NUMERIC.", "ratio_basis_points_u64")),
484            "Fix: device work-queue occupancy must use the shared driver numeric ratio helper instead of a backend-local basis-point formula."
485        );
486    }
487
488    #[test]
489    fn device_work_queue_backpressure_chunks_large_resident_queues_without_host_participation() {
490        let plan = plan_device_work_queue_backpressure(
491            DeviceWorkQueueProfile {
492                initial_items: 4_096,
493                queue_capacity: 65_536,
494                entry_bytes: 16,
495                control_bytes: 128,
496                budget_bytes: 2 << 20,
497                host_sync: WorkQueueHostSync::FinalOnly,
498            },
499            8_192,
500        )
501        .expect("Fix: large resident work queue should plan bounded device-side drain chunks");
502
503        assert_eq!(
504            plan.strategy,
505            DeviceWorkQueueDrainStrategy::ChunkedResidentDrain
506        );
507        assert_eq!(plan.items_per_chunk, 8_192);
508        assert_eq!(plan.chunks, 8);
509        assert_eq!(plan.queue.resident_bytes, 1_048_704);
510        assert!(plan.final_only_host_sync);
511        assert!(plan.queue.final_only_host_sync);
512    }
513
514    #[test]
515    fn device_work_queue_backpressure_ceil_division_handles_max_capacity() {
516        let plan = plan_device_work_queue_backpressure(
517            DeviceWorkQueueProfile {
518                initial_items: u64::MAX,
519                queue_capacity: u64::MAX,
520                entry_bytes: 1,
521                control_bytes: 0,
522                budget_bytes: u64::MAX,
523                host_sync: WorkQueueHostSync::FinalOnly,
524            },
525            65_536,
526        )
527        .expect("Fix: ceil division for max-capacity queues must not overflow");
528
529        assert_eq!(
530            plan.strategy,
531            DeviceWorkQueueDrainStrategy::ChunkedResidentDrain
532        );
533        assert_eq!(plan.queue.queue_bytes, u64::MAX);
534        assert_eq!(plan.items_per_chunk, 65_536);
535        assert_eq!(plan.chunks, 281_474_976_710_656);
536        assert!(plan.final_only_host_sync);
537    }
538
539    #[test]
540    fn device_work_queue_backpressure_rejects_zero_drain_chunk() {
541        let err = plan_device_work_queue_backpressure(
542            DeviceWorkQueueProfile {
543                initial_items: 1,
544                queue_capacity: 8,
545                entry_bytes: 16,
546                control_bytes: 64,
547                budget_bytes: 1_024,
548                host_sync: WorkQueueHostSync::FinalOnly,
549            },
550            0,
551        )
552        .expect_err("zero drain chunk must fail loudly");
553
554        assert_eq!(err, DeviceWorkQueueError::ZeroDrainChunk);
555    }
556
557    #[test]
558    fn generated_device_work_queue_profiles_preserve_budget_and_sync_contracts() {
559        let mut state = 0xa409_3822_299f_31d0_u64;
560        for case_index in 0..2048usize {
561            let queue_capacity = 1 + next_u64(&mut state) % 262_144;
562            let entry_bytes = 1 + next_u64(&mut state) % 256;
563            let initial_items = next_u64(&mut state) % (queue_capacity + 1);
564            let control_bytes = next_u64(&mut state) % 4096;
565            let queue_bytes = queue_capacity
566                .checked_mul(entry_bytes)
567                .expect("Fix: generated queue byte count should fit");
568            let resident_bytes = queue_bytes
569                .checked_add(control_bytes)
570                .expect("Fix: generated resident byte count should fit");
571            let budget_bytes = resident_bytes + (next_u64(&mut state) % 8192);
572            let profile = DeviceWorkQueueProfile {
573                initial_items,
574                queue_capacity,
575                entry_bytes,
576                control_bytes,
577                budget_bytes,
578                host_sync: WorkQueueHostSync::FinalOnly,
579            };
580
581            let plan = plan_device_work_queue(profile)
582                .expect("Fix: generated valid queue profile must plan");
583            assert_eq!(plan.queue_bytes, queue_bytes, "case {case_index}");
584            assert_eq!(plan.control_bytes, control_bytes, "case {case_index}");
585            assert_eq!(plan.resident_bytes, resident_bytes, "case {case_index}");
586            assert!(plan.resident_bytes <= budget_bytes, "case {case_index}");
587            assert!(plan.initial_occupancy_bps <= 10_000, "case {case_index}");
588            assert!(plan.final_only_host_sync, "case {case_index}");
589
590            let drain = 1 + next_u64(&mut state) % queue_capacity;
591            let backpressure = plan_device_work_queue_backpressure(profile, drain)
592                .expect("Fix: generated valid backpressure profile must plan");
593            assert_eq!(backpressure.queue, plan, "case {case_index}");
594            assert!(
595                backpressure.items_per_chunk <= queue_capacity,
596                "case {case_index}"
597            );
598            assert!(backpressure.chunks >= 1, "case {case_index}");
599            assert!(backpressure.final_only_host_sync, "case {case_index}");
600
601            let expansion_items = next_u64(&mut state) % queue_capacity;
602            let expansion_budget = resident_bytes + (expansion_items * entry_bytes);
603            let expansion =
604                plan_device_work_queue_with_expansion(DeviceWorkQueueExpansionProfile {
605                    initial_items,
606                    expansion_items,
607                    entry_bytes,
608                    control_bytes,
609                    budget_bytes: expansion_budget,
610                    host_sync: WorkQueueHostSync::FinalOnly,
611                })
612                .expect("Fix: generated valid expansion queue profile must plan");
613            assert!(
614                expansion.resident_bytes <= expansion_budget,
615                "case {case_index}"
616            );
617            assert!(
618                expansion.queue_bytes >= initial_items * entry_bytes,
619                "case {case_index}"
620            );
621            assert!(expansion.final_only_host_sync, "case {case_index}");
622        }
623    }
624
625    fn next_u64(state: &mut u64) -> u64 {
626        *state = state
627            .wrapping_mul(6_364_136_223_846_793_005)
628            .wrapping_add(1_442_695_040_888_963_407);
629        *state
630    }
631}