Skip to main content

nexus_core/
module_d.rs

1use std::path::PathBuf;
2
3use anyhow::Result;
4
5#[cfg(target_os = "linux")]
6const ALIGNMENT: usize = 4096;
7
8#[derive(Debug, Clone)]
9pub struct ModuleDConfig {
10    pub target_path: PathBuf,
11    pub total_bytes: u64,
12    pub group_bytes: usize,
13    pub request_bytes: usize,
14    pub producers: usize,
15    pub seed: u64,
16    pub allow_file_fallback: bool,
17    pub require_io_uring: bool,
18}
19
20#[derive(Debug, Clone)]
21pub struct ModuleDStats {
22    pub bytes_written: u64,
23    pub commits: u64,
24    pub elapsed_ms: f64,
25    pub throughput_mb_s: f64,
26    pub io_wait_pct: f64,
27    pub mode: String,
28    pub alignment_violations: u64,
29    pub write_errors: u64,
30    pub target_path: PathBuf,
31}
32
33impl ModuleDStats {
34    pub fn to_json(&self) -> String {
35        format!(
36            "{{\"module\":\"D\",\"bytes_written\":{},\"commits\":{},\"elapsed_ms\":{:.3},\"throughput_mb_s\":{:.3},\"io_wait_pct\":{:.3},\"mode\":\"{}\",\"alignment_violations\":{},\"write_errors\":{},\"target_path\":\"{}\"}}",
37            self.bytes_written,
38            self.commits,
39            self.elapsed_ms,
40            self.throughput_mb_s,
41            self.io_wait_pct,
42            self.mode,
43            self.alignment_violations,
44            self.write_errors,
45            self.target_path.display()
46        )
47    }
48}
49
50#[cfg(target_os = "linux")]
51mod linux {
52    use std::fs;
53    use std::fs::OpenOptions as StdOpenOptions;
54    use std::io;
55    use std::io::ErrorKind;
56    use std::os::unix::fs::FileExt;
57    use std::os::unix::fs::OpenOptionsExt;
58    use std::path::{Path, PathBuf};
59    use std::sync::mpsc;
60    use std::time::Instant;
61
62    use anyhow::{Context, Result};
63    use nix::libc;
64    use tokio::sync::mpsc as tokio_mpsc;
65    use tokio_uring::buf::BoundedBuf;
66    use tokio_uring::fs::{File, OpenOptions as TokioOpenOptions};
67
68    use crate::module_d::{ModuleDConfig, ModuleDStats, ALIGNMENT};
69
70    #[derive(Debug, Clone, Copy)]
71    struct CpuSample {
72        total: u64,
73        iowait: u64,
74    }
75
76    struct ProducerChunk {
77        payload: Vec<u8>,
78    }
79
80    pub(super) fn run(config: ModuleDConfig) -> Result<ModuleDStats> {
81        validate_config(&config)?;
82        if config.require_io_uring && !io_uring_probe() {
83            anyhow::bail!("io_uring is required but unavailable in this environment");
84        }
85        let async_config = config.clone();
86        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
87            tokio_uring::start(async move { run_async(async_config).await })
88        })) {
89            Ok(Ok(stats)) => Ok(stats),
90            Ok(Err(err)) => {
91                if should_fallback_to_sync(&err.to_string()) {
92                    if config.require_io_uring {
93                        return Err(err).context("io_uring required; refusing sync fallback");
94                    }
95                    eprintln!(
96                        "module-d: io_uring unavailable ({}); retrying with sync direct-io fallback",
97                        err
98                    );
99                    run_sync(config)
100                } else {
101                    Err(err)
102                }
103            }
104            Err(payload) => {
105                let panic_message = panic_payload_to_string(&payload);
106                if should_fallback_to_sync(&panic_message) {
107                    if config.require_io_uring {
108                        anyhow::bail!(
109                            "io_uring required; runtime init failed with panic: {}",
110                            panic_message
111                        );
112                    }
113                    eprintln!(
114                        "module-d: io_uring unavailable ({}); retrying with sync direct-io fallback",
115                        panic_message
116                    );
117                    run_sync(config)
118                } else {
119                    std::panic::resume_unwind(payload);
120                }
121            }
122        }
123    }
124
125    async fn run_async(config: ModuleDConfig) -> Result<ModuleDStats> {
126        let cpu_before = read_cpu_sample().context("failed reading pre-run CPU sample")?;
127        let start = Instant::now();
128
129        let (file, mode, resolved_target) = open_target(&config).await.with_context(|| {
130            format!(
131                "failed opening target path {}",
132                config.target_path.display()
133            )
134        })?;
135
136        let request_bytes_u64 = config.request_bytes as u64;
137        let total_requests = config.total_bytes.div_ceil(request_bytes_u64);
138        let channel_capacity = config.producers.saturating_mul(2).max(4);
139        let (tx, mut rx) = tokio_mpsc::channel::<ProducerChunk>(channel_capacity);
140
141        for producer_id in 0..config.producers {
142            let tx = tx.clone();
143            let producer_count = config.producers as u64;
144            let total_bytes = config.total_bytes;
145            let request_bytes = config.request_bytes;
146            let seed = config.seed;
147            tokio_uring::spawn(async move {
148                let mut seq = producer_id as u64;
149                while seq < total_requests {
150                    let offset = seq * request_bytes as u64;
151                    let remaining = total_bytes.saturating_sub(offset);
152                    if remaining == 0 {
153                        break;
154                    }
155                    let chunk_bytes = remaining.min(request_bytes as u64) as usize;
156                    let mut payload = vec![0_u8; chunk_bytes];
157                    fill_payload(&mut payload, seed, producer_id as u64, seq);
158                    if tx.send(ProducerChunk { payload }).await.is_err() {
159                        break;
160                    }
161                    seq += producer_count;
162                }
163            });
164        }
165        drop(tx);
166
167        let mut pending = Vec::<u8>::with_capacity(config.group_bytes + config.request_bytes);
168        let mut write_buf = vec![0_u8; config.group_bytes + ALIGNMENT];
169        let write_buf_align_start = aligned_offset(write_buf.as_ptr() as usize, ALIGNMENT);
170
171        let mut file_offset = 0_u64;
172        let mut logical_bytes_written = 0_u64;
173        let mut commits = 0_u64;
174        let mut alignment_violations = 0_u64;
175        let mut write_errors = 0_u64;
176
177        while let Some(chunk) = rx.recv().await {
178            let mut idx = 0usize;
179            while idx < chunk.payload.len() {
180                let remaining_in_group = config.group_bytes - pending.len();
181                let take = remaining_in_group.min(chunk.payload.len() - idx);
182                pending.extend_from_slice(&chunk.payload[idx..idx + take]);
183                idx += take;
184
185                if pending.len() == config.group_bytes {
186                    flush_pending(
187                        &file,
188                        &mut pending,
189                        &mut write_buf,
190                        write_buf_align_start,
191                        &mut file_offset,
192                        &mut logical_bytes_written,
193                        &mut commits,
194                        &mut alignment_violations,
195                        &mut write_errors,
196                        false,
197                    )
198                    .await?;
199                }
200            }
201        }
202
203        if !pending.is_empty() {
204            flush_pending(
205                &file,
206                &mut pending,
207                &mut write_buf,
208                write_buf_align_start,
209                &mut file_offset,
210                &mut logical_bytes_written,
211                &mut commits,
212                &mut alignment_violations,
213                &mut write_errors,
214                true,
215            )
216            .await?;
217        }
218
219        file.sync_data()
220            .await
221            .context("module-d sync_data failed")?;
222        file.close().await.context("module-d close failed")?;
223
224        if logical_bytes_written != config.total_bytes {
225            anyhow::bail!(
226                "module-d byte mismatch: expected {}, wrote {}",
227                config.total_bytes,
228                logical_bytes_written
229            );
230        }
231
232        let elapsed = start.elapsed();
233        let elapsed_ms = elapsed.as_secs_f64() * 1_000.0;
234        let throughput_mb_s =
235            (logical_bytes_written as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64();
236
237        let cpu_after = read_cpu_sample().context("failed reading post-run CPU sample")?;
238        let io_wait_pct = compute_iowait_pct(cpu_before, cpu_after);
239
240        Ok(ModuleDStats {
241            bytes_written: logical_bytes_written,
242            commits,
243            elapsed_ms,
244            throughput_mb_s,
245            io_wait_pct,
246            mode,
247            alignment_violations,
248            write_errors,
249            target_path: resolved_target,
250        })
251    }
252
253    fn run_sync(config: ModuleDConfig) -> Result<ModuleDStats> {
254        let cpu_before = read_cpu_sample().context("failed reading pre-run CPU sample")?;
255        let start = Instant::now();
256
257        let (file, mode, resolved_target) = open_target_sync(&config).with_context(|| {
258            format!(
259                "failed opening target path {}",
260                config.target_path.display()
261            )
262        })?;
263
264        let request_bytes_u64 = config.request_bytes as u64;
265        let total_requests = config.total_bytes.div_ceil(request_bytes_u64);
266        let channel_capacity = config.producers.saturating_mul(2).max(4);
267        let (tx, rx) = mpsc::sync_channel::<ProducerChunk>(channel_capacity);
268
269        let mut producer_handles = Vec::with_capacity(config.producers);
270        for producer_id in 0..config.producers {
271            let tx = tx.clone();
272            let producer_count = config.producers as u64;
273            let total_bytes = config.total_bytes;
274            let request_bytes = config.request_bytes;
275            let seed = config.seed;
276            producer_handles.push(std::thread::spawn(move || {
277                let mut seq = producer_id as u64;
278                while seq < total_requests {
279                    let offset = seq * request_bytes as u64;
280                    let remaining = total_bytes.saturating_sub(offset);
281                    if remaining == 0 {
282                        break;
283                    }
284                    let chunk_bytes = remaining.min(request_bytes as u64) as usize;
285                    let mut payload = vec![0_u8; chunk_bytes];
286                    fill_payload(&mut payload, seed, producer_id as u64, seq);
287                    if tx.send(ProducerChunk { payload }).is_err() {
288                        break;
289                    }
290                    seq += producer_count;
291                }
292            }));
293        }
294        drop(tx);
295
296        let mut pending = Vec::<u8>::with_capacity(config.group_bytes + config.request_bytes);
297        let mut write_buf = vec![0_u8; config.group_bytes + ALIGNMENT];
298        let write_buf_align_start = aligned_offset(write_buf.as_ptr() as usize, ALIGNMENT);
299
300        let mut file_offset = 0_u64;
301        let mut logical_bytes_written = 0_u64;
302        let mut commits = 0_u64;
303        let mut alignment_violations = 0_u64;
304        let mut write_errors = 0_u64;
305
306        for chunk in rx {
307            let mut idx = 0usize;
308            while idx < chunk.payload.len() {
309                let remaining_in_group = config.group_bytes - pending.len();
310                let take = remaining_in_group.min(chunk.payload.len() - idx);
311                pending.extend_from_slice(&chunk.payload[idx..idx + take]);
312                idx += take;
313
314                if pending.len() == config.group_bytes {
315                    flush_pending_sync(
316                        &file,
317                        &mut pending,
318                        &mut write_buf,
319                        write_buf_align_start,
320                        &mut file_offset,
321                        &mut logical_bytes_written,
322                        &mut commits,
323                        &mut alignment_violations,
324                        &mut write_errors,
325                        false,
326                    )?;
327                }
328            }
329        }
330
331        if !pending.is_empty() {
332            flush_pending_sync(
333                &file,
334                &mut pending,
335                &mut write_buf,
336                write_buf_align_start,
337                &mut file_offset,
338                &mut logical_bytes_written,
339                &mut commits,
340                &mut alignment_violations,
341                &mut write_errors,
342                true,
343            )?;
344        }
345
346        for handle in producer_handles {
347            if handle.join().is_err() {
348                anyhow::bail!("producer thread panicked in module-d sync fallback");
349            }
350        }
351
352        file.sync_data()
353            .context("module-d sync_data failed (sync fallback)")?;
354
355        if logical_bytes_written != config.total_bytes {
356            anyhow::bail!(
357                "module-d byte mismatch: expected {}, wrote {}",
358                config.total_bytes,
359                logical_bytes_written
360            );
361        }
362
363        let elapsed = start.elapsed();
364        let elapsed_ms = elapsed.as_secs_f64() * 1_000.0;
365        let throughput_mb_s =
366            (logical_bytes_written as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64();
367
368        let cpu_after = read_cpu_sample().context("failed reading post-run CPU sample")?;
369        let io_wait_pct = compute_iowait_pct(cpu_before, cpu_after);
370
371        Ok(ModuleDStats {
372            bytes_written: logical_bytes_written,
373            commits,
374            elapsed_ms,
375            throughput_mb_s,
376            io_wait_pct,
377            mode,
378            alignment_violations,
379            write_errors,
380            target_path: resolved_target,
381        })
382    }
383
384    fn flush_pending_sync(
385        file: &std::fs::File,
386        pending: &mut Vec<u8>,
387        write_buf: &mut Vec<u8>,
388        write_buf_align_start: usize,
389        file_offset: &mut u64,
390        logical_bytes_written: &mut u64,
391        commits: &mut u64,
392        alignment_violations: &mut u64,
393        write_errors: &mut u64,
394        pad_tail: bool,
395    ) -> Result<()> {
396        let logical_len = pending.len();
397        let physical_len = if pad_tail {
398            align_up(logical_len, ALIGNMENT)
399        } else {
400            logical_len
401        };
402
403        if physical_len == 0 {
404            pending.clear();
405            return Ok(());
406        }
407
408        let ptr = write_buf.as_ptr() as usize + write_buf_align_start;
409        if ptr % ALIGNMENT != 0
410            || physical_len % ALIGNMENT != 0
411            || *file_offset % ALIGNMENT as u64 != 0
412        {
413            *alignment_violations += 1;
414            anyhow::bail!(
415                "unaligned write detected: ptr_mod={}, physical_len_mod={}, offset_mod={}",
416                ptr % ALIGNMENT,
417                physical_len % ALIGNMENT,
418                *file_offset % ALIGNMENT as u64
419            );
420        }
421
422        let needed = write_buf_align_start + physical_len;
423        if write_buf.len() < needed {
424            write_buf.resize(needed, 0);
425        }
426
427        write_buf[write_buf_align_start..write_buf_align_start + logical_len]
428            .copy_from_slice(pending.as_slice());
429        if physical_len > logical_len {
430            write_buf[write_buf_align_start + logical_len..write_buf_align_start + physical_len]
431                .fill(0);
432        }
433
434        let aligned = &write_buf[write_buf_align_start..write_buf_align_start + physical_len];
435        if let Err(err) = write_all_at(file, aligned, *file_offset) {
436            *write_errors += 1;
437            return Err(err).context("module-d write_at failed (sync fallback)");
438        }
439
440        *file_offset += physical_len as u64;
441        *logical_bytes_written += logical_len as u64;
442        *commits += 1;
443        pending.clear();
444
445        Ok(())
446    }
447
448    fn write_all_at(file: &std::fs::File, mut buf: &[u8], mut offset: u64) -> io::Result<()> {
449        while !buf.is_empty() {
450            let written = file.write_at(buf, offset)?;
451            if written == 0 {
452                return Err(io::Error::new(
453                    ErrorKind::WriteZero,
454                    "write_at returned 0 bytes",
455                ));
456            }
457            buf = &buf[written..];
458            offset = offset.saturating_add(written as u64);
459        }
460        Ok(())
461    }
462
463    async fn flush_pending(
464        file: &File,
465        pending: &mut Vec<u8>,
466        write_buf: &mut Vec<u8>,
467        write_buf_align_start: usize,
468        file_offset: &mut u64,
469        logical_bytes_written: &mut u64,
470        commits: &mut u64,
471        alignment_violations: &mut u64,
472        write_errors: &mut u64,
473        pad_tail: bool,
474    ) -> Result<()> {
475        let logical_len = pending.len();
476        let physical_len = if pad_tail {
477            align_up(logical_len, ALIGNMENT)
478        } else {
479            logical_len
480        };
481
482        if physical_len == 0 {
483            pending.clear();
484            return Ok(());
485        }
486
487        let ptr = write_buf.as_ptr() as usize + write_buf_align_start;
488        if ptr % ALIGNMENT != 0
489            || physical_len % ALIGNMENT != 0
490            || *file_offset % ALIGNMENT as u64 != 0
491        {
492            *alignment_violations += 1;
493            anyhow::bail!(
494                "unaligned write detected: ptr_mod={}, physical_len_mod={}, offset_mod={}",
495                ptr % ALIGNMENT,
496                physical_len % ALIGNMENT,
497                *file_offset % ALIGNMENT as u64
498            );
499        }
500
501        let needed = write_buf_align_start + physical_len;
502        if write_buf.len() < needed {
503            write_buf.resize(needed, 0);
504        }
505
506        write_buf[write_buf_align_start..write_buf_align_start + logical_len]
507            .copy_from_slice(pending.as_slice());
508        if physical_len > logical_len {
509            write_buf[write_buf_align_start + logical_len..write_buf_align_start + physical_len]
510                .fill(0);
511        }
512
513        let mut owned = std::mem::take(write_buf);
514        let slice = owned.slice(write_buf_align_start..write_buf_align_start + physical_len);
515        let (result, returned) = file.write_all_at(slice, *file_offset).await;
516        owned = returned.into_inner();
517        *write_buf = owned;
518
519        if let Err(err) = result {
520            *write_errors += 1;
521            return Err(err).context("module-d write_all_at failed");
522        }
523
524        *file_offset += physical_len as u64;
525        *logical_bytes_written += logical_len as u64;
526        *commits += 1;
527        pending.clear();
528
529        Ok(())
530    }
531
532    async fn open_target(config: &ModuleDConfig) -> Result<(File, String, PathBuf)> {
533        match open_direct(&config.target_path, false).await {
534            Ok(file) => Ok((file, "block".to_string(), config.target_path.clone())),
535            Err(primary_err) => {
536                if !config.allow_file_fallback {
537                    return Err(primary_err).with_context(|| {
538                        format!(
539                            "opening {} as raw block target failed and fallback disabled",
540                            config.target_path.display()
541                        )
542                    });
543                }
544
545                let fallback_path = fallback_path_for(&config.target_path);
546                if let Some(parent) = fallback_path.parent() {
547                    if !parent.as_os_str().is_empty() {
548                        fs::create_dir_all(parent).with_context(|| {
549                            format!("failed to create fallback parent {}", parent.display())
550                        })?;
551                    }
552                }
553
554                let file = open_direct(&fallback_path, true).await.with_context(|| {
555                    format!(
556                        "failed opening fallback sparse file target {}",
557                        fallback_path.display()
558                    )
559                })?;
560
561                Ok((file, "file-fallback".to_string(), fallback_path))
562            }
563        }
564    }
565
566    fn open_target_sync(config: &ModuleDConfig) -> Result<(std::fs::File, String, PathBuf)> {
567        match open_direct_sync(&config.target_path, false) {
568            Ok(file) => Ok((file, "block".to_string(), config.target_path.clone())),
569            Err(primary_err) => {
570                if !config.allow_file_fallback {
571                    return Err(primary_err).with_context(|| {
572                        format!(
573                            "opening {} as raw block target failed and fallback disabled",
574                            config.target_path.display()
575                        )
576                    });
577                }
578
579                let fallback_path = fallback_path_for(&config.target_path);
580                if let Some(parent) = fallback_path.parent() {
581                    if !parent.as_os_str().is_empty() {
582                        fs::create_dir_all(parent).with_context(|| {
583                            format!("failed to create fallback parent {}", parent.display())
584                        })?;
585                    }
586                }
587
588                let file = open_direct_sync(&fallback_path, true).with_context(|| {
589                    format!(
590                        "failed opening fallback sparse file target {}",
591                        fallback_path.display()
592                    )
593                })?;
594
595                Ok((file, "file-fallback".to_string(), fallback_path))
596            }
597        }
598    }
599
600    async fn open_direct(path: &Path, create_file: bool) -> Result<File> {
601        let mut opts = TokioOpenOptions::new();
602        opts.write(true);
603        if create_file {
604            opts.create(true).truncate(true).mode(0o644);
605        }
606        opts.custom_flags(libc::O_DIRECT | libc::O_DSYNC);
607        opts.open(path)
608            .await
609            .with_context(|| format!("open_direct failed for {}", path.display()))
610    }
611
612    fn open_direct_sync(path: &Path, create_file: bool) -> Result<std::fs::File> {
613        let mut opts = StdOpenOptions::new();
614        opts.write(true);
615        if create_file {
616            opts.create(true).truncate(true).mode(0o644);
617        }
618        opts.custom_flags(libc::O_DIRECT | libc::O_DSYNC);
619        opts.open(path)
620            .with_context(|| format!("open_direct failed for {}", path.display()))
621    }
622
623    fn fallback_path_for(target: &Path) -> PathBuf {
624        if target.starts_with("/dev") {
625            PathBuf::from("/tmp/tracer-bullet-module-d-direct.bin")
626        } else {
627            target.with_extension("direct.bin")
628        }
629    }
630
631    fn should_fallback_to_sync(message: &str) -> bool {
632        message.contains("Operation not permitted")
633            || message.contains("io_uring")
634            || message.contains("tokio-uring")
635    }
636
637    pub(super) fn io_uring_available() -> bool {
638        io_uring_probe()
639    }
640
641    fn io_uring_probe() -> bool {
642        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
643            tokio_uring::start(async { Ok::<(), anyhow::Error>(()) })
644        }))
645        .is_ok()
646    }
647
648    fn panic_payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
649        if let Some(text) = payload.downcast_ref::<&str>() {
650            return (*text).to_string();
651        }
652        if let Some(text) = payload.downcast_ref::<String>() {
653            return text.clone();
654        }
655        "unknown panic payload".to_string()
656    }
657
658    fn fill_payload(buffer: &mut [u8], seed: u64, producer_id: u64, sequence: u64) {
659        let mut state = seed
660            ^ producer_id.wrapping_mul(0x9E37_79B9_7F4A_7C15)
661            ^ sequence.wrapping_mul(0xBF58_476D_1CE4_E5B9);
662
663        for byte in buffer.iter_mut() {
664            state ^= state << 13;
665            state ^= state >> 7;
666            state ^= state << 17;
667            *byte = (state >> 24) as u8;
668        }
669    }
670
671    fn read_cpu_sample() -> Result<CpuSample> {
672        let stat = fs::read_to_string("/proc/stat").context("failed to read /proc/stat")?;
673        let line = stat
674            .lines()
675            .next()
676            .context("/proc/stat did not contain cpu header")?;
677
678        let mut fields = line.split_whitespace();
679        let cpu_tag = fields.next().context("missing cpu tag in /proc/stat")?;
680        if cpu_tag != "cpu" {
681            anyhow::bail!("unexpected cpu tag in /proc/stat: {}", cpu_tag);
682        }
683
684        let mut values = Vec::with_capacity(8);
685        for field in fields.take(8) {
686            values.push(
687                field
688                    .parse::<u64>()
689                    .with_context(|| format!("failed parsing /proc/stat field: {}", field))?,
690            );
691        }
692
693        if values.len() < 5 {
694            anyhow::bail!("/proc/stat cpu line missing expected counters");
695        }
696
697        let total = values.iter().copied().sum::<u64>();
698        let iowait = values[4];
699
700        Ok(CpuSample { total, iowait })
701    }
702
703    fn compute_iowait_pct(before: CpuSample, after: CpuSample) -> f64 {
704        let total_delta = after.total.saturating_sub(before.total);
705        if total_delta == 0 {
706            return 0.0;
707        }
708
709        let iowait_delta = after.iowait.saturating_sub(before.iowait);
710        (iowait_delta as f64 / total_delta as f64) * 100.0
711    }
712
713    fn validate_config(config: &ModuleDConfig) -> Result<()> {
714        if config.total_bytes == 0 {
715            anyhow::bail!("total_bytes must be > 0");
716        }
717        if config.group_bytes == 0 || config.group_bytes % ALIGNMENT != 0 {
718            anyhow::bail!("group_bytes must be > 0 and aligned to {} bytes", ALIGNMENT);
719        }
720        if config.request_bytes == 0 || config.request_bytes % ALIGNMENT != 0 {
721            anyhow::bail!(
722                "request_bytes must be > 0 and aligned to {} bytes",
723                ALIGNMENT
724            );
725        }
726        if config.group_bytes < config.request_bytes {
727            anyhow::bail!("group_bytes must be >= request_bytes");
728        }
729        if config.producers == 0 {
730            anyhow::bail!("producers must be > 0");
731        }
732        if config.total_bytes % ALIGNMENT as u64 != 0 {
733            anyhow::bail!(
734                "total_bytes must be aligned to {} bytes for O_DIRECT",
735                ALIGNMENT
736            );
737        }
738
739        Ok(())
740    }
741
742    fn align_up(value: usize, align: usize) -> usize {
743        if value % align == 0 {
744            value
745        } else {
746            value + (align - (value % align))
747        }
748    }
749
750    fn aligned_offset(ptr: usize, align: usize) -> usize {
751        (align - (ptr % align)) % align
752    }
753
754    #[cfg(test)]
755    mod tests {
756        use super::*;
757
758        #[test]
759        fn alignment_math_is_correct() {
760            assert_eq!(align_up(4096, 4096), 4096);
761            assert_eq!(align_up(4097, 4096), 8192);
762            assert_eq!(align_up(8191, 4096), 8192);
763        }
764
765        #[test]
766        fn validates_direct_io_constraints() {
767            let err = validate_config(&ModuleDConfig {
768                target_path: PathBuf::from("/tmp/x"),
769                total_bytes: 123,
770                group_bytes: 16 * 1024 * 1024,
771                request_bytes: 256 * 1024,
772                producers: 1,
773                seed: 1,
774                allow_file_fallback: true,
775                require_io_uring: false,
776            })
777            .expect_err("unaligned total_bytes should fail");
778            assert!(err.to_string().contains("total_bytes"));
779        }
780
781        #[test]
782        fn aligned_offset_returns_expected_values() {
783            assert_eq!(aligned_offset(0, 4096), 0);
784            assert_eq!(aligned_offset(1, 4096), 4095);
785            assert_eq!(aligned_offset(4095, 4096), 1);
786            assert_eq!(aligned_offset(4096, 4096), 0);
787        }
788    }
789}
790
791#[cfg(target_os = "linux")]
792pub fn run(config: ModuleDConfig) -> Result<ModuleDStats> {
793    linux::run(config)
794}
795
796#[cfg(target_os = "linux")]
797pub fn io_uring_available() -> bool {
798    linux::io_uring_available()
799}
800
801#[cfg(not(target_os = "linux"))]
802pub fn run(_config: ModuleDConfig) -> Result<ModuleDStats> {
803    anyhow::bail!("module-d requires Linux (io_uring + O_DIRECT)")
804}
805
806#[cfg(not(target_os = "linux"))]
807pub fn io_uring_available() -> bool {
808    false
809}