1use std::path::PathBuf;
2
3use anyhow::Result;
4
5#[cfg(target_os = "linux")]
6const ALIGNMENT: usize = 4096;
7
8#[derive(Debug, Clone)]
9pub struct ModuleDConfig {
10 pub target_path: PathBuf,
11 pub total_bytes: u64,
12 pub group_bytes: usize,
13 pub request_bytes: usize,
14 pub producers: usize,
15 pub seed: u64,
16 pub allow_file_fallback: bool,
17 pub require_io_uring: bool,
18}
19
20#[derive(Debug, Clone)]
21pub struct ModuleDStats {
22 pub bytes_written: u64,
23 pub commits: u64,
24 pub elapsed_ms: f64,
25 pub throughput_mb_s: f64,
26 pub io_wait_pct: f64,
27 pub mode: String,
28 pub alignment_violations: u64,
29 pub write_errors: u64,
30 pub target_path: PathBuf,
31}
32
33impl ModuleDStats {
34 pub fn to_json(&self) -> String {
35 format!(
36 "{{\"module\":\"D\",\"bytes_written\":{},\"commits\":{},\"elapsed_ms\":{:.3},\"throughput_mb_s\":{:.3},\"io_wait_pct\":{:.3},\"mode\":\"{}\",\"alignment_violations\":{},\"write_errors\":{},\"target_path\":\"{}\"}}",
37 self.bytes_written,
38 self.commits,
39 self.elapsed_ms,
40 self.throughput_mb_s,
41 self.io_wait_pct,
42 self.mode,
43 self.alignment_violations,
44 self.write_errors,
45 self.target_path.display()
46 )
47 }
48}
49
50#[cfg(target_os = "linux")]
51mod linux {
52 use std::fs;
53 use std::fs::OpenOptions as StdOpenOptions;
54 use std::io;
55 use std::io::ErrorKind;
56 use std::os::unix::fs::FileExt;
57 use std::os::unix::fs::OpenOptionsExt;
58 use std::path::{Path, PathBuf};
59 use std::sync::mpsc;
60 use std::time::Instant;
61
62 use anyhow::{Context, Result};
63 use nix::libc;
64 use tokio::sync::mpsc as tokio_mpsc;
65 use tokio_uring::buf::BoundedBuf;
66 use tokio_uring::fs::{File, OpenOptions as TokioOpenOptions};
67
68 use crate::module_d::{ModuleDConfig, ModuleDStats, ALIGNMENT};
69
70 #[derive(Debug, Clone, Copy)]
71 struct CpuSample {
72 total: u64,
73 iowait: u64,
74 }
75
76 struct ProducerChunk {
77 payload: Vec<u8>,
78 }
79
80 pub(super) fn run(config: ModuleDConfig) -> Result<ModuleDStats> {
81 validate_config(&config)?;
82 if config.require_io_uring && !io_uring_probe() {
83 anyhow::bail!("io_uring is required but unavailable in this environment");
84 }
85 let async_config = config.clone();
86 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
87 tokio_uring::start(async move { run_async(async_config).await })
88 })) {
89 Ok(Ok(stats)) => Ok(stats),
90 Ok(Err(err)) => {
91 if should_fallback_to_sync(&err.to_string()) {
92 if config.require_io_uring {
93 return Err(err).context("io_uring required; refusing sync fallback");
94 }
95 eprintln!(
96 "module-d: io_uring unavailable ({}); retrying with sync direct-io fallback",
97 err
98 );
99 run_sync(config)
100 } else {
101 Err(err)
102 }
103 }
104 Err(payload) => {
105 let panic_message = panic_payload_to_string(&payload);
106 if should_fallback_to_sync(&panic_message) {
107 if config.require_io_uring {
108 anyhow::bail!(
109 "io_uring required; runtime init failed with panic: {}",
110 panic_message
111 );
112 }
113 eprintln!(
114 "module-d: io_uring unavailable ({}); retrying with sync direct-io fallback",
115 panic_message
116 );
117 run_sync(config)
118 } else {
119 std::panic::resume_unwind(payload);
120 }
121 }
122 }
123 }
124
125 async fn run_async(config: ModuleDConfig) -> Result<ModuleDStats> {
126 let cpu_before = read_cpu_sample().context("failed reading pre-run CPU sample")?;
127 let start = Instant::now();
128
129 let (file, mode, resolved_target) = open_target(&config).await.with_context(|| {
130 format!(
131 "failed opening target path {}",
132 config.target_path.display()
133 )
134 })?;
135
136 let request_bytes_u64 = config.request_bytes as u64;
137 let total_requests = config.total_bytes.div_ceil(request_bytes_u64);
138 let channel_capacity = config.producers.saturating_mul(2).max(4);
139 let (tx, mut rx) = tokio_mpsc::channel::<ProducerChunk>(channel_capacity);
140
141 for producer_id in 0..config.producers {
142 let tx = tx.clone();
143 let producer_count = config.producers as u64;
144 let total_bytes = config.total_bytes;
145 let request_bytes = config.request_bytes;
146 let seed = config.seed;
147 tokio_uring::spawn(async move {
148 let mut seq = producer_id as u64;
149 while seq < total_requests {
150 let offset = seq * request_bytes as u64;
151 let remaining = total_bytes.saturating_sub(offset);
152 if remaining == 0 {
153 break;
154 }
155 let chunk_bytes = remaining.min(request_bytes as u64) as usize;
156 let mut payload = vec![0_u8; chunk_bytes];
157 fill_payload(&mut payload, seed, producer_id as u64, seq);
158 if tx.send(ProducerChunk { payload }).await.is_err() {
159 break;
160 }
161 seq += producer_count;
162 }
163 });
164 }
165 drop(tx);
166
167 let mut pending = Vec::<u8>::with_capacity(config.group_bytes + config.request_bytes);
168 let mut write_buf = vec![0_u8; config.group_bytes + ALIGNMENT];
169 let write_buf_align_start = aligned_offset(write_buf.as_ptr() as usize, ALIGNMENT);
170
171 let mut file_offset = 0_u64;
172 let mut logical_bytes_written = 0_u64;
173 let mut commits = 0_u64;
174 let mut alignment_violations = 0_u64;
175 let mut write_errors = 0_u64;
176
177 while let Some(chunk) = rx.recv().await {
178 let mut idx = 0usize;
179 while idx < chunk.payload.len() {
180 let remaining_in_group = config.group_bytes - pending.len();
181 let take = remaining_in_group.min(chunk.payload.len() - idx);
182 pending.extend_from_slice(&chunk.payload[idx..idx + take]);
183 idx += take;
184
185 if pending.len() == config.group_bytes {
186 flush_pending(
187 &file,
188 &mut pending,
189 &mut write_buf,
190 write_buf_align_start,
191 &mut file_offset,
192 &mut logical_bytes_written,
193 &mut commits,
194 &mut alignment_violations,
195 &mut write_errors,
196 false,
197 )
198 .await?;
199 }
200 }
201 }
202
203 if !pending.is_empty() {
204 flush_pending(
205 &file,
206 &mut pending,
207 &mut write_buf,
208 write_buf_align_start,
209 &mut file_offset,
210 &mut logical_bytes_written,
211 &mut commits,
212 &mut alignment_violations,
213 &mut write_errors,
214 true,
215 )
216 .await?;
217 }
218
219 file.sync_data()
220 .await
221 .context("module-d sync_data failed")?;
222 file.close().await.context("module-d close failed")?;
223
224 if logical_bytes_written != config.total_bytes {
225 anyhow::bail!(
226 "module-d byte mismatch: expected {}, wrote {}",
227 config.total_bytes,
228 logical_bytes_written
229 );
230 }
231
232 let elapsed = start.elapsed();
233 let elapsed_ms = elapsed.as_secs_f64() * 1_000.0;
234 let throughput_mb_s =
235 (logical_bytes_written as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64();
236
237 let cpu_after = read_cpu_sample().context("failed reading post-run CPU sample")?;
238 let io_wait_pct = compute_iowait_pct(cpu_before, cpu_after);
239
240 Ok(ModuleDStats {
241 bytes_written: logical_bytes_written,
242 commits,
243 elapsed_ms,
244 throughput_mb_s,
245 io_wait_pct,
246 mode,
247 alignment_violations,
248 write_errors,
249 target_path: resolved_target,
250 })
251 }
252
253 fn run_sync(config: ModuleDConfig) -> Result<ModuleDStats> {
254 let cpu_before = read_cpu_sample().context("failed reading pre-run CPU sample")?;
255 let start = Instant::now();
256
257 let (file, mode, resolved_target) = open_target_sync(&config).with_context(|| {
258 format!(
259 "failed opening target path {}",
260 config.target_path.display()
261 )
262 })?;
263
264 let request_bytes_u64 = config.request_bytes as u64;
265 let total_requests = config.total_bytes.div_ceil(request_bytes_u64);
266 let channel_capacity = config.producers.saturating_mul(2).max(4);
267 let (tx, rx) = mpsc::sync_channel::<ProducerChunk>(channel_capacity);
268
269 let mut producer_handles = Vec::with_capacity(config.producers);
270 for producer_id in 0..config.producers {
271 let tx = tx.clone();
272 let producer_count = config.producers as u64;
273 let total_bytes = config.total_bytes;
274 let request_bytes = config.request_bytes;
275 let seed = config.seed;
276 producer_handles.push(std::thread::spawn(move || {
277 let mut seq = producer_id as u64;
278 while seq < total_requests {
279 let offset = seq * request_bytes as u64;
280 let remaining = total_bytes.saturating_sub(offset);
281 if remaining == 0 {
282 break;
283 }
284 let chunk_bytes = remaining.min(request_bytes as u64) as usize;
285 let mut payload = vec![0_u8; chunk_bytes];
286 fill_payload(&mut payload, seed, producer_id as u64, seq);
287 if tx.send(ProducerChunk { payload }).is_err() {
288 break;
289 }
290 seq += producer_count;
291 }
292 }));
293 }
294 drop(tx);
295
296 let mut pending = Vec::<u8>::with_capacity(config.group_bytes + config.request_bytes);
297 let mut write_buf = vec![0_u8; config.group_bytes + ALIGNMENT];
298 let write_buf_align_start = aligned_offset(write_buf.as_ptr() as usize, ALIGNMENT);
299
300 let mut file_offset = 0_u64;
301 let mut logical_bytes_written = 0_u64;
302 let mut commits = 0_u64;
303 let mut alignment_violations = 0_u64;
304 let mut write_errors = 0_u64;
305
306 for chunk in rx {
307 let mut idx = 0usize;
308 while idx < chunk.payload.len() {
309 let remaining_in_group = config.group_bytes - pending.len();
310 let take = remaining_in_group.min(chunk.payload.len() - idx);
311 pending.extend_from_slice(&chunk.payload[idx..idx + take]);
312 idx += take;
313
314 if pending.len() == config.group_bytes {
315 flush_pending_sync(
316 &file,
317 &mut pending,
318 &mut write_buf,
319 write_buf_align_start,
320 &mut file_offset,
321 &mut logical_bytes_written,
322 &mut commits,
323 &mut alignment_violations,
324 &mut write_errors,
325 false,
326 )?;
327 }
328 }
329 }
330
331 if !pending.is_empty() {
332 flush_pending_sync(
333 &file,
334 &mut pending,
335 &mut write_buf,
336 write_buf_align_start,
337 &mut file_offset,
338 &mut logical_bytes_written,
339 &mut commits,
340 &mut alignment_violations,
341 &mut write_errors,
342 true,
343 )?;
344 }
345
346 for handle in producer_handles {
347 if handle.join().is_err() {
348 anyhow::bail!("producer thread panicked in module-d sync fallback");
349 }
350 }
351
352 file.sync_data()
353 .context("module-d sync_data failed (sync fallback)")?;
354
355 if logical_bytes_written != config.total_bytes {
356 anyhow::bail!(
357 "module-d byte mismatch: expected {}, wrote {}",
358 config.total_bytes,
359 logical_bytes_written
360 );
361 }
362
363 let elapsed = start.elapsed();
364 let elapsed_ms = elapsed.as_secs_f64() * 1_000.0;
365 let throughput_mb_s =
366 (logical_bytes_written as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64();
367
368 let cpu_after = read_cpu_sample().context("failed reading post-run CPU sample")?;
369 let io_wait_pct = compute_iowait_pct(cpu_before, cpu_after);
370
371 Ok(ModuleDStats {
372 bytes_written: logical_bytes_written,
373 commits,
374 elapsed_ms,
375 throughput_mb_s,
376 io_wait_pct,
377 mode,
378 alignment_violations,
379 write_errors,
380 target_path: resolved_target,
381 })
382 }
383
384 fn flush_pending_sync(
385 file: &std::fs::File,
386 pending: &mut Vec<u8>,
387 write_buf: &mut Vec<u8>,
388 write_buf_align_start: usize,
389 file_offset: &mut u64,
390 logical_bytes_written: &mut u64,
391 commits: &mut u64,
392 alignment_violations: &mut u64,
393 write_errors: &mut u64,
394 pad_tail: bool,
395 ) -> Result<()> {
396 let logical_len = pending.len();
397 let physical_len = if pad_tail {
398 align_up(logical_len, ALIGNMENT)
399 } else {
400 logical_len
401 };
402
403 if physical_len == 0 {
404 pending.clear();
405 return Ok(());
406 }
407
408 let ptr = write_buf.as_ptr() as usize + write_buf_align_start;
409 if ptr % ALIGNMENT != 0
410 || physical_len % ALIGNMENT != 0
411 || *file_offset % ALIGNMENT as u64 != 0
412 {
413 *alignment_violations += 1;
414 anyhow::bail!(
415 "unaligned write detected: ptr_mod={}, physical_len_mod={}, offset_mod={}",
416 ptr % ALIGNMENT,
417 physical_len % ALIGNMENT,
418 *file_offset % ALIGNMENT as u64
419 );
420 }
421
422 let needed = write_buf_align_start + physical_len;
423 if write_buf.len() < needed {
424 write_buf.resize(needed, 0);
425 }
426
427 write_buf[write_buf_align_start..write_buf_align_start + logical_len]
428 .copy_from_slice(pending.as_slice());
429 if physical_len > logical_len {
430 write_buf[write_buf_align_start + logical_len..write_buf_align_start + physical_len]
431 .fill(0);
432 }
433
434 let aligned = &write_buf[write_buf_align_start..write_buf_align_start + physical_len];
435 if let Err(err) = write_all_at(file, aligned, *file_offset) {
436 *write_errors += 1;
437 return Err(err).context("module-d write_at failed (sync fallback)");
438 }
439
440 *file_offset += physical_len as u64;
441 *logical_bytes_written += logical_len as u64;
442 *commits += 1;
443 pending.clear();
444
445 Ok(())
446 }
447
448 fn write_all_at(file: &std::fs::File, mut buf: &[u8], mut offset: u64) -> io::Result<()> {
449 while !buf.is_empty() {
450 let written = file.write_at(buf, offset)?;
451 if written == 0 {
452 return Err(io::Error::new(
453 ErrorKind::WriteZero,
454 "write_at returned 0 bytes",
455 ));
456 }
457 buf = &buf[written..];
458 offset = offset.saturating_add(written as u64);
459 }
460 Ok(())
461 }
462
463 async fn flush_pending(
464 file: &File,
465 pending: &mut Vec<u8>,
466 write_buf: &mut Vec<u8>,
467 write_buf_align_start: usize,
468 file_offset: &mut u64,
469 logical_bytes_written: &mut u64,
470 commits: &mut u64,
471 alignment_violations: &mut u64,
472 write_errors: &mut u64,
473 pad_tail: bool,
474 ) -> Result<()> {
475 let logical_len = pending.len();
476 let physical_len = if pad_tail {
477 align_up(logical_len, ALIGNMENT)
478 } else {
479 logical_len
480 };
481
482 if physical_len == 0 {
483 pending.clear();
484 return Ok(());
485 }
486
487 let ptr = write_buf.as_ptr() as usize + write_buf_align_start;
488 if ptr % ALIGNMENT != 0
489 || physical_len % ALIGNMENT != 0
490 || *file_offset % ALIGNMENT as u64 != 0
491 {
492 *alignment_violations += 1;
493 anyhow::bail!(
494 "unaligned write detected: ptr_mod={}, physical_len_mod={}, offset_mod={}",
495 ptr % ALIGNMENT,
496 physical_len % ALIGNMENT,
497 *file_offset % ALIGNMENT as u64
498 );
499 }
500
501 let needed = write_buf_align_start + physical_len;
502 if write_buf.len() < needed {
503 write_buf.resize(needed, 0);
504 }
505
506 write_buf[write_buf_align_start..write_buf_align_start + logical_len]
507 .copy_from_slice(pending.as_slice());
508 if physical_len > logical_len {
509 write_buf[write_buf_align_start + logical_len..write_buf_align_start + physical_len]
510 .fill(0);
511 }
512
513 let mut owned = std::mem::take(write_buf);
514 let slice = owned.slice(write_buf_align_start..write_buf_align_start + physical_len);
515 let (result, returned) = file.write_all_at(slice, *file_offset).await;
516 owned = returned.into_inner();
517 *write_buf = owned;
518
519 if let Err(err) = result {
520 *write_errors += 1;
521 return Err(err).context("module-d write_all_at failed");
522 }
523
524 *file_offset += physical_len as u64;
525 *logical_bytes_written += logical_len as u64;
526 *commits += 1;
527 pending.clear();
528
529 Ok(())
530 }
531
532 async fn open_target(config: &ModuleDConfig) -> Result<(File, String, PathBuf)> {
533 match open_direct(&config.target_path, false).await {
534 Ok(file) => Ok((file, "block".to_string(), config.target_path.clone())),
535 Err(primary_err) => {
536 if !config.allow_file_fallback {
537 return Err(primary_err).with_context(|| {
538 format!(
539 "opening {} as raw block target failed and fallback disabled",
540 config.target_path.display()
541 )
542 });
543 }
544
545 let fallback_path = fallback_path_for(&config.target_path);
546 if let Some(parent) = fallback_path.parent() {
547 if !parent.as_os_str().is_empty() {
548 fs::create_dir_all(parent).with_context(|| {
549 format!("failed to create fallback parent {}", parent.display())
550 })?;
551 }
552 }
553
554 let file = open_direct(&fallback_path, true).await.with_context(|| {
555 format!(
556 "failed opening fallback sparse file target {}",
557 fallback_path.display()
558 )
559 })?;
560
561 Ok((file, "file-fallback".to_string(), fallback_path))
562 }
563 }
564 }
565
566 fn open_target_sync(config: &ModuleDConfig) -> Result<(std::fs::File, String, PathBuf)> {
567 match open_direct_sync(&config.target_path, false) {
568 Ok(file) => Ok((file, "block".to_string(), config.target_path.clone())),
569 Err(primary_err) => {
570 if !config.allow_file_fallback {
571 return Err(primary_err).with_context(|| {
572 format!(
573 "opening {} as raw block target failed and fallback disabled",
574 config.target_path.display()
575 )
576 });
577 }
578
579 let fallback_path = fallback_path_for(&config.target_path);
580 if let Some(parent) = fallback_path.parent() {
581 if !parent.as_os_str().is_empty() {
582 fs::create_dir_all(parent).with_context(|| {
583 format!("failed to create fallback parent {}", parent.display())
584 })?;
585 }
586 }
587
588 let file = open_direct_sync(&fallback_path, true).with_context(|| {
589 format!(
590 "failed opening fallback sparse file target {}",
591 fallback_path.display()
592 )
593 })?;
594
595 Ok((file, "file-fallback".to_string(), fallback_path))
596 }
597 }
598 }
599
600 async fn open_direct(path: &Path, create_file: bool) -> Result<File> {
601 let mut opts = TokioOpenOptions::new();
602 opts.write(true);
603 if create_file {
604 opts.create(true).truncate(true).mode(0o644);
605 }
606 opts.custom_flags(libc::O_DIRECT | libc::O_DSYNC);
607 opts.open(path)
608 .await
609 .with_context(|| format!("open_direct failed for {}", path.display()))
610 }
611
612 fn open_direct_sync(path: &Path, create_file: bool) -> Result<std::fs::File> {
613 let mut opts = StdOpenOptions::new();
614 opts.write(true);
615 if create_file {
616 opts.create(true).truncate(true).mode(0o644);
617 }
618 opts.custom_flags(libc::O_DIRECT | libc::O_DSYNC);
619 opts.open(path)
620 .with_context(|| format!("open_direct failed for {}", path.display()))
621 }
622
623 fn fallback_path_for(target: &Path) -> PathBuf {
624 if target.starts_with("/dev") {
625 PathBuf::from("/tmp/tracer-bullet-module-d-direct.bin")
626 } else {
627 target.with_extension("direct.bin")
628 }
629 }
630
631 fn should_fallback_to_sync(message: &str) -> bool {
632 message.contains("Operation not permitted")
633 || message.contains("io_uring")
634 || message.contains("tokio-uring")
635 }
636
637 pub(super) fn io_uring_available() -> bool {
638 io_uring_probe()
639 }
640
641 fn io_uring_probe() -> bool {
642 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
643 tokio_uring::start(async { Ok::<(), anyhow::Error>(()) })
644 }))
645 .is_ok()
646 }
647
648 fn panic_payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
649 if let Some(text) = payload.downcast_ref::<&str>() {
650 return (*text).to_string();
651 }
652 if let Some(text) = payload.downcast_ref::<String>() {
653 return text.clone();
654 }
655 "unknown panic payload".to_string()
656 }
657
658 fn fill_payload(buffer: &mut [u8], seed: u64, producer_id: u64, sequence: u64) {
659 let mut state = seed
660 ^ producer_id.wrapping_mul(0x9E37_79B9_7F4A_7C15)
661 ^ sequence.wrapping_mul(0xBF58_476D_1CE4_E5B9);
662
663 for byte in buffer.iter_mut() {
664 state ^= state << 13;
665 state ^= state >> 7;
666 state ^= state << 17;
667 *byte = (state >> 24) as u8;
668 }
669 }
670
671 fn read_cpu_sample() -> Result<CpuSample> {
672 let stat = fs::read_to_string("/proc/stat").context("failed to read /proc/stat")?;
673 let line = stat
674 .lines()
675 .next()
676 .context("/proc/stat did not contain cpu header")?;
677
678 let mut fields = line.split_whitespace();
679 let cpu_tag = fields.next().context("missing cpu tag in /proc/stat")?;
680 if cpu_tag != "cpu" {
681 anyhow::bail!("unexpected cpu tag in /proc/stat: {}", cpu_tag);
682 }
683
684 let mut values = Vec::with_capacity(8);
685 for field in fields.take(8) {
686 values.push(
687 field
688 .parse::<u64>()
689 .with_context(|| format!("failed parsing /proc/stat field: {}", field))?,
690 );
691 }
692
693 if values.len() < 5 {
694 anyhow::bail!("/proc/stat cpu line missing expected counters");
695 }
696
697 let total = values.iter().copied().sum::<u64>();
698 let iowait = values[4];
699
700 Ok(CpuSample { total, iowait })
701 }
702
703 fn compute_iowait_pct(before: CpuSample, after: CpuSample) -> f64 {
704 let total_delta = after.total.saturating_sub(before.total);
705 if total_delta == 0 {
706 return 0.0;
707 }
708
709 let iowait_delta = after.iowait.saturating_sub(before.iowait);
710 (iowait_delta as f64 / total_delta as f64) * 100.0
711 }
712
713 fn validate_config(config: &ModuleDConfig) -> Result<()> {
714 if config.total_bytes == 0 {
715 anyhow::bail!("total_bytes must be > 0");
716 }
717 if config.group_bytes == 0 || config.group_bytes % ALIGNMENT != 0 {
718 anyhow::bail!("group_bytes must be > 0 and aligned to {} bytes", ALIGNMENT);
719 }
720 if config.request_bytes == 0 || config.request_bytes % ALIGNMENT != 0 {
721 anyhow::bail!(
722 "request_bytes must be > 0 and aligned to {} bytes",
723 ALIGNMENT
724 );
725 }
726 if config.group_bytes < config.request_bytes {
727 anyhow::bail!("group_bytes must be >= request_bytes");
728 }
729 if config.producers == 0 {
730 anyhow::bail!("producers must be > 0");
731 }
732 if config.total_bytes % ALIGNMENT as u64 != 0 {
733 anyhow::bail!(
734 "total_bytes must be aligned to {} bytes for O_DIRECT",
735 ALIGNMENT
736 );
737 }
738
739 Ok(())
740 }
741
742 fn align_up(value: usize, align: usize) -> usize {
743 if value % align == 0 {
744 value
745 } else {
746 value + (align - (value % align))
747 }
748 }
749
750 fn aligned_offset(ptr: usize, align: usize) -> usize {
751 (align - (ptr % align)) % align
752 }
753
754 #[cfg(test)]
755 mod tests {
756 use super::*;
757
758 #[test]
759 fn alignment_math_is_correct() {
760 assert_eq!(align_up(4096, 4096), 4096);
761 assert_eq!(align_up(4097, 4096), 8192);
762 assert_eq!(align_up(8191, 4096), 8192);
763 }
764
765 #[test]
766 fn validates_direct_io_constraints() {
767 let err = validate_config(&ModuleDConfig {
768 target_path: PathBuf::from("/tmp/x"),
769 total_bytes: 123,
770 group_bytes: 16 * 1024 * 1024,
771 request_bytes: 256 * 1024,
772 producers: 1,
773 seed: 1,
774 allow_file_fallback: true,
775 require_io_uring: false,
776 })
777 .expect_err("unaligned total_bytes should fail");
778 assert!(err.to_string().contains("total_bytes"));
779 }
780
781 #[test]
782 fn aligned_offset_returns_expected_values() {
783 assert_eq!(aligned_offset(0, 4096), 0);
784 assert_eq!(aligned_offset(1, 4096), 4095);
785 assert_eq!(aligned_offset(4095, 4096), 1);
786 assert_eq!(aligned_offset(4096, 4096), 0);
787 }
788 }
789}
790
791#[cfg(target_os = "linux")]
792pub fn run(config: ModuleDConfig) -> Result<ModuleDStats> {
793 linux::run(config)
794}
795
796#[cfg(target_os = "linux")]
797pub fn io_uring_available() -> bool {
798 linux::io_uring_available()
799}
800
801#[cfg(not(target_os = "linux"))]
802pub fn run(_config: ModuleDConfig) -> Result<ModuleDStats> {
803 anyhow::bail!("module-d requires Linux (io_uring + O_DIRECT)")
804}
805
806#[cfg(not(target_os = "linux"))]
807pub fn io_uring_available() -> bool {
808 false
809}