nexus_core/
device_scheduler.rs1use std::path::PathBuf;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4
5use anyhow::Result;
6
7#[derive(Debug)]
8pub struct DeviceState {
9 pub id: usize,
10 pub path: PathBuf,
11 queue_depth: AtomicU64,
12 next_offset: AtomicU64,
13}
14
15impl DeviceState {
16 pub fn new(id: usize, path: PathBuf) -> Self {
17 Self {
18 id,
19 path,
20 queue_depth: AtomicU64::new(0),
21 next_offset: AtomicU64::new(0),
22 }
23 }
24
25 pub fn queue_depth(&self) -> u64 {
26 self.queue_depth.load(Ordering::Relaxed)
27 }
28
29 pub fn reserve_aligned_offset(&self, len: u64, alignment: u64) -> u64 {
30 let aligned_len = align_up_u64(len, alignment);
31 self.next_offset.fetch_add(aligned_len, Ordering::AcqRel)
32 }
33
34 pub fn increment_queue_depth(&self) {
35 self.queue_depth.fetch_add(1, Ordering::AcqRel);
36 }
37
38 pub fn decrement_queue_depth(&self) {
39 self.queue_depth
40 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |depth| {
41 Some(depth.saturating_sub(1))
42 })
43 .ok();
44 }
45}
46
47#[derive(Debug, Clone)]
48pub struct DeviceScheduler {
49 devices: Vec<Arc<DeviceState>>,
50 rr_counter: Arc<AtomicUsize>,
51}
52
53#[derive(Debug, Clone)]
54pub struct ScheduledDevice {
55 pub device: Arc<DeviceState>,
56 pub reserved_offset: u64,
57}
58
59impl DeviceScheduler {
60 pub fn new(device_paths: &[PathBuf]) -> Result<Self> {
61 if device_paths.is_empty() {
62 anyhow::bail!("at least one device path is required");
63 }
64 if device_paths.len() > 5 {
65 anyhow::bail!("at most 5 data devices are supported");
66 }
67
68 let devices = device_paths
69 .iter()
70 .enumerate()
71 .map(|(idx, path)| Arc::new(DeviceState::new(idx, path.clone())))
72 .collect::<Vec<_>>();
73
74 Ok(Self {
75 devices,
76 rr_counter: Arc::new(AtomicUsize::new(0)),
77 })
78 }
79
80 pub fn devices(&self) -> &[Arc<DeviceState>] {
81 &self.devices
82 }
83
84 pub fn select_and_reserve(&self, len: u64, alignment: u64) -> ScheduledDevice {
85 let idx = self.select_device_index();
86 let device = self.devices[idx].clone();
87 let reserved_offset = device.reserve_aligned_offset(len, alignment);
88 ScheduledDevice {
89 device,
90 reserved_offset,
91 }
92 }
93
94 pub fn telemetry(&self) -> Vec<(usize, PathBuf, u64)> {
95 self.devices
96 .iter()
97 .map(|device| (device.id, device.path.clone(), device.queue_depth()))
98 .collect()
99 }
100
101 fn select_device_index(&self) -> usize {
102 if self.devices.len() == 1 {
103 return 0;
104 }
105
106 let mut min_depth = u64::MAX;
107 let mut candidates = Vec::<usize>::new();
108
109 for (idx, device) in self.devices.iter().enumerate() {
110 let depth = device.queue_depth();
111 match depth.cmp(&min_depth) {
112 std::cmp::Ordering::Less => {
113 min_depth = depth;
114 candidates.clear();
115 candidates.push(idx);
116 }
117 std::cmp::Ordering::Equal => {
118 candidates.push(idx);
119 }
120 std::cmp::Ordering::Greater => {}
121 }
122 }
123
124 if candidates.len() == 1 {
125 return candidates[0];
126 }
127
128 let start = self.rr_counter.fetch_add(1, Ordering::AcqRel) % self.devices.len();
129 for offset in 0..self.devices.len() {
130 let idx = (start + offset) % self.devices.len();
131 if candidates.contains(&idx) {
132 return idx;
133 }
134 }
135 candidates[0]
136 }
137}
138
139pub fn align_up_u64(value: u64, alignment: u64) -> u64 {
140 if value % alignment == 0 {
141 value
142 } else {
143 value + (alignment - (value % alignment))
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn select_and_reserve_never_overlaps_offsets() {
153 let paths = vec![
154 PathBuf::from("/dev/device-a"),
155 PathBuf::from("/dev/device-b"),
156 ];
157 let scheduler = DeviceScheduler::new(&paths).expect("scheduler init should succeed");
158 let first = scheduler.select_and_reserve(4096, 4096);
159 let second = scheduler.select_and_reserve(4096, 4096);
160 if first.device.id == second.device.id {
161 assert_ne!(first.reserved_offset, second.reserved_offset);
162 }
163 }
164
165 #[test]
166 fn align_up_u64_rounds_to_boundary() {
167 assert_eq!(align_up_u64(4097, 4096), 8192);
168 assert_eq!(align_up_u64(8192, 4096), 8192);
169 }
170
171 #[test]
172 fn scheduler_rejects_more_than_five_devices() {
173 let paths = (0..6)
174 .map(|idx| PathBuf::from(format!("/dev/device-{idx}")))
175 .collect::<Vec<_>>();
176 let err = DeviceScheduler::new(&paths).expect_err("expected validation failure");
177 assert!(err.to_string().contains("at most 5 data devices"));
178 }
179}