1pub mod direct_file;
16pub mod direct_fs;
17pub mod monitor;
18
19use std::{
20 fmt::{Debug, Display},
21 future::Future,
22 num::NonZeroUsize,
23 str::FromStr,
24};
25
26use direct_file::DirectFileDeviceConfig;
27use direct_fs::DirectFsDeviceConfig;
28use monitor::Monitored;
29
30use crate::{
31 error::Result,
32 io::{
33 buffer::{IoBuf, IoBufMut},
34 PAGE,
35 },
36 DirectFileDevice, DirectFileDeviceOptions, DirectFsDevice, DirectFsDeviceOptions, Runtime,
37};
38
39#[cfg(test)]
40pub mod test_utils;
41#[cfg(test)]
42use test_utils::NoopDevice;
43
44pub type RegionId = u32;
45
46pub trait DevConfig: Send + Sync + 'static + Debug {}
48impl<T: Send + Sync + 'static + Debug> DevConfig for T {}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
53pub enum IopsCounter {
54 PerIo,
56 PerIoSize(NonZeroUsize),
58}
59
60impl IopsCounter {
61 pub fn per_io() -> Self {
63 Self::PerIo
64 }
65
66 pub fn per_io_size(io_size: usize) -> Self {
70 Self::PerIoSize(NonZeroUsize::new(io_size).expect("io size must be non-zero"))
71 }
72
73 pub fn count(&self, bytes: usize) -> usize {
75 match self {
76 IopsCounter::PerIo => 1,
77 IopsCounter::PerIoSize(size) => bytes / *size + if bytes % *size != 0 { 1 } else { 0 },
78 }
79 }
80}
81
82impl Display for IopsCounter {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 IopsCounter::PerIo => write!(f, "PerIo"),
86 IopsCounter::PerIoSize(size) => write!(f, "PerIoSize({size})"),
87 }
88 }
89}
90
91impl FromStr for IopsCounter {
92 type Err = anyhow::Error;
93
94 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
95 match s.trim() {
96 "PerIo" => Ok(IopsCounter::PerIo),
97 _ if s.starts_with("PerIoSize(") && s.ends_with(')') => {
98 let num = &s[10..s.len() - 1];
99 let v = num.parse::<NonZeroUsize>()?;
100 Ok(IopsCounter::PerIoSize(v))
101 }
102 _ => Err(anyhow::anyhow!("Invalid IopsCounter format: {}", s)),
103 }
104 }
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
109#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
110#[cfg_attr(feature = "clap", derive(clap::Args))]
111pub struct Throttle {
112 #[cfg_attr(feature = "clap", clap(long))]
114 pub write_iops: Option<NonZeroUsize>,
115 #[cfg_attr(feature = "clap", clap(long))]
117 pub read_iops: Option<NonZeroUsize>,
118 #[cfg_attr(feature = "clap", clap(long))]
120 pub write_throughput: Option<NonZeroUsize>,
121 #[cfg_attr(feature = "clap", clap(long))]
123 pub read_throughput: Option<NonZeroUsize>,
124 #[cfg_attr(feature = "clap", clap(long, default_value = "PerIo"))]
126 pub iops_counter: IopsCounter,
127}
128
129impl Default for Throttle {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl Throttle {
136 pub fn new() -> Self {
138 Self {
139 write_iops: None,
140 read_iops: None,
141 write_throughput: None,
142 read_throughput: None,
143 iops_counter: IopsCounter::PerIo,
144 }
145 }
146
147 pub fn with_write_iops(mut self, iops: usize) -> Self {
149 self.write_iops = NonZeroUsize::new(iops);
150 self
151 }
152
153 pub fn with_read_iops(mut self, iops: usize) -> Self {
155 self.read_iops = NonZeroUsize::new(iops);
156 self
157 }
158
159 pub fn with_write_throughput(mut self, throughput: usize) -> Self {
161 self.write_throughput = NonZeroUsize::new(throughput);
162 self
163 }
164
165 pub fn with_read_throughput(mut self, throughput: usize) -> Self {
167 self.read_throughput = NonZeroUsize::new(throughput);
168 self
169 }
170
171 pub fn with_iops_counter(mut self, counter: IopsCounter) -> Self {
173 self.iops_counter = counter;
174 self
175 }
176}
177
178pub trait Dev: Send + Sync + 'static + Sized + Clone + Debug {
182 type Config: DevConfig;
184
185 fn capacity(&self) -> usize;
187
188 fn region_size(&self) -> usize;
190
191 fn throttle(&self) -> &Throttle;
193
194 #[must_use]
196 fn open(config: Self::Config, runtime: Runtime) -> impl Future<Output = Result<Self>> + Send;
197
198 #[must_use]
200 fn write<B>(&self, buf: B, region: RegionId, offset: u64) -> impl Future<Output = (B, Result<()>)> + Send
201 where
202 B: IoBuf;
203
204 #[must_use]
206 fn read<B>(&self, buf: B, region: RegionId, offset: u64) -> impl Future<Output = (B, Result<()>)> + Send
207 where
208 B: IoBufMut;
209
210 #[must_use]
212 fn sync(&self, region: Option<RegionId>) -> impl Future<Output = Result<()>> + Send;
213}
214
215pub trait DevExt: Dev {
217 fn align(&self) -> usize {
219 PAGE
220 }
221
222 fn regions(&self) -> usize {
224 self.capacity() / self.region_size()
225 }
226}
227
228impl<T> DevExt for T where T: Dev {}
229
230#[derive(Debug, Clone)]
231pub enum DeviceConfig {
232 DirectFile(DirectFileDeviceConfig),
233 DirectFs(DirectFsDeviceConfig),
234 #[cfg(test)]
235 Noop,
236}
237
238impl From<DirectFileDeviceOptions> for DeviceConfig {
239 fn from(options: DirectFileDeviceOptions) -> Self {
240 Self::DirectFile(options.into())
241 }
242}
243
244impl From<DirectFsDeviceOptions> for DeviceConfig {
245 fn from(options: DirectFsDeviceOptions) -> Self {
246 Self::DirectFs(options.into())
247 }
248}
249
250#[cfg(test)]
251impl From<()> for DeviceConfig {
252 fn from(_: ()) -> Self {
253 Self::Noop
254 }
255}
256
257#[derive(Debug, Clone)]
258pub enum Device {
259 DirectFile(DirectFileDevice),
260 DirectFs(DirectFsDevice),
261 #[cfg(test)]
262 Noop(NoopDevice),
263}
264
265impl Dev for Device {
266 type Config = DeviceConfig;
267
268 fn capacity(&self) -> usize {
269 match self {
270 Device::DirectFile(dev) => dev.capacity(),
271 Device::DirectFs(dev) => dev.capacity(),
272 #[cfg(test)]
273 Device::Noop(dev) => dev.capacity(),
274 }
275 }
276
277 fn region_size(&self) -> usize {
278 match self {
279 Device::DirectFile(dev) => dev.region_size(),
280 Device::DirectFs(dev) => dev.region_size(),
281 #[cfg(test)]
282 Device::Noop(dev) => dev.region_size(),
283 }
284 }
285
286 async fn open(options: Self::Config, runtime: Runtime) -> Result<Self> {
287 match options {
288 DeviceConfig::DirectFile(opts) => Ok(Self::DirectFile(DirectFileDevice::open(opts, runtime).await?)),
289 DeviceConfig::DirectFs(opts) => Ok(Self::DirectFs(DirectFsDevice::open(opts, runtime).await?)),
290 #[cfg(test)]
291 DeviceConfig::Noop => Ok(Self::Noop(NoopDevice::open((), runtime).await?)),
292 }
293 }
294
295 fn throttle(&self) -> &Throttle {
296 match self {
297 Device::DirectFile(dev) => dev.throttle(),
298 Device::DirectFs(dev) => dev.throttle(),
299 #[cfg(test)]
300 Device::Noop(dev) => dev.throttle(),
301 }
302 }
303
304 async fn write<B>(&self, buf: B, region: RegionId, offset: u64) -> (B, Result<()>)
305 where
306 B: IoBuf,
307 {
308 match self {
309 Device::DirectFile(dev) => dev.write(buf, region, offset).await,
310 Device::DirectFs(dev) => dev.write(buf, region, offset).await,
311 #[cfg(test)]
312 Device::Noop(dev) => dev.write(buf, region, offset).await,
313 }
314 }
315
316 async fn read<B>(&self, buf: B, region: RegionId, offset: u64) -> (B, Result<()>)
317 where
318 B: IoBufMut,
319 {
320 match self {
321 Device::DirectFile(dev) => dev.read(buf, region, offset).await,
322 Device::DirectFs(dev) => dev.read(buf, region, offset).await,
323 #[cfg(test)]
324 Device::Noop(dev) => dev.read(buf, region, offset).await,
325 }
326 }
327
328 async fn sync(&self, region: Option<RegionId>) -> Result<()> {
329 match self {
330 Device::DirectFile(dev) => dev.sync(region).await,
331 Device::DirectFs(dev) => dev.sync(region).await,
332 #[cfg(test)]
333 Device::Noop(dev) => dev.sync(region).await,
334 }
335 }
336}
337
338pub type MonitoredDevice = Monitored<Device>;