Skip to main content

nexus_core/
device_scheduler.rs

1use std::path::PathBuf;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4
5use anyhow::Result;
6
7#[derive(Debug)]
8pub struct DeviceState {
9    pub id: usize,
10    pub path: PathBuf,
11    queue_depth: AtomicU64,
12    next_offset: AtomicU64,
13}
14
15impl DeviceState {
16    pub fn new(id: usize, path: PathBuf) -> Self {
17        Self {
18            id,
19            path,
20            queue_depth: AtomicU64::new(0),
21            next_offset: AtomicU64::new(0),
22        }
23    }
24
25    pub fn queue_depth(&self) -> u64 {
26        self.queue_depth.load(Ordering::Relaxed)
27    }
28
29    pub fn reserve_aligned_offset(&self, len: u64, alignment: u64) -> u64 {
30        let aligned_len = align_up_u64(len, alignment);
31        self.next_offset.fetch_add(aligned_len, Ordering::AcqRel)
32    }
33
34    pub fn increment_queue_depth(&self) {
35        self.queue_depth.fetch_add(1, Ordering::AcqRel);
36    }
37
38    pub fn decrement_queue_depth(&self) {
39        self.queue_depth
40            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |depth| {
41                Some(depth.saturating_sub(1))
42            })
43            .ok();
44    }
45}
46
47#[derive(Debug, Clone)]
48pub struct DeviceScheduler {
49    devices: Vec<Arc<DeviceState>>,
50    rr_counter: Arc<AtomicUsize>,
51}
52
53#[derive(Debug, Clone)]
54pub struct ScheduledDevice {
55    pub device: Arc<DeviceState>,
56    pub reserved_offset: u64,
57}
58
59impl DeviceScheduler {
60    pub fn new(device_paths: &[PathBuf]) -> Result<Self> {
61        if device_paths.is_empty() {
62            anyhow::bail!("at least one device path is required");
63        }
64        if device_paths.len() > 5 {
65            anyhow::bail!("at most 5 data devices are supported");
66        }
67
68        let devices = device_paths
69            .iter()
70            .enumerate()
71            .map(|(idx, path)| Arc::new(DeviceState::new(idx, path.clone())))
72            .collect::<Vec<_>>();
73
74        Ok(Self {
75            devices,
76            rr_counter: Arc::new(AtomicUsize::new(0)),
77        })
78    }
79
80    pub fn devices(&self) -> &[Arc<DeviceState>] {
81        &self.devices
82    }
83
84    pub fn select_and_reserve(&self, len: u64, alignment: u64) -> ScheduledDevice {
85        let idx = self.select_device_index();
86        let device = self.devices[idx].clone();
87        let reserved_offset = device.reserve_aligned_offset(len, alignment);
88        ScheduledDevice {
89            device,
90            reserved_offset,
91        }
92    }
93
94    pub fn telemetry(&self) -> Vec<(usize, PathBuf, u64)> {
95        self.devices
96            .iter()
97            .map(|device| (device.id, device.path.clone(), device.queue_depth()))
98            .collect()
99    }
100
101    fn select_device_index(&self) -> usize {
102        if self.devices.len() == 1 {
103            return 0;
104        }
105
106        let mut min_depth = u64::MAX;
107        let mut candidates = Vec::<usize>::new();
108
109        for (idx, device) in self.devices.iter().enumerate() {
110            let depth = device.queue_depth();
111            match depth.cmp(&min_depth) {
112                std::cmp::Ordering::Less => {
113                    min_depth = depth;
114                    candidates.clear();
115                    candidates.push(idx);
116                }
117                std::cmp::Ordering::Equal => {
118                    candidates.push(idx);
119                }
120                std::cmp::Ordering::Greater => {}
121            }
122        }
123
124        if candidates.len() == 1 {
125            return candidates[0];
126        }
127
128        let start = self.rr_counter.fetch_add(1, Ordering::AcqRel) % self.devices.len();
129        for offset in 0..self.devices.len() {
130            let idx = (start + offset) % self.devices.len();
131            if candidates.contains(&idx) {
132                return idx;
133            }
134        }
135        candidates[0]
136    }
137}
138
139pub fn align_up_u64(value: u64, alignment: u64) -> u64 {
140    if value % alignment == 0 {
141        value
142    } else {
143        value + (alignment - (value % alignment))
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn select_and_reserve_never_overlaps_offsets() {
153        let paths = vec![
154            PathBuf::from("/dev/device-a"),
155            PathBuf::from("/dev/device-b"),
156        ];
157        let scheduler = DeviceScheduler::new(&paths).expect("scheduler init should succeed");
158        let first = scheduler.select_and_reserve(4096, 4096);
159        let second = scheduler.select_and_reserve(4096, 4096);
160        if first.device.id == second.device.id {
161            assert_ne!(first.reserved_offset, second.reserved_offset);
162        }
163    }
164
165    #[test]
166    fn align_up_u64_rounds_to_boundary() {
167        assert_eq!(align_up_u64(4097, 4096), 8192);
168        assert_eq!(align_up_u64(8192, 4096), 8192);
169    }
170
171    #[test]
172    fn scheduler_rejects_more_than_five_devices() {
173        let paths = (0..6)
174            .map(|idx| PathBuf::from(format!("/dev/device-{idx}")))
175            .collect::<Vec<_>>();
176        let err = DeviceScheduler::new(&paths).expect_err("expected validation failure");
177        assert!(err.to_string().contains("at most 5 data devices"));
178    }
179}