Skip to main content

nexus_core/
module_c.rs

1use std::fs::{self, File};
2use std::io::{BufWriter, Write};
3use std::path::PathBuf;
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use memmap2::MmapOptions;
8
9use crate::alloc_counter;
10use crate::generated::{v1_generated, v2_generated};
11
12#[derive(Debug, Clone)]
13pub struct ModuleCWriteConfig {
14    pub records: u64,
15    pub wal_path: PathBuf,
16}
17
18#[derive(Debug, Clone)]
19pub struct ModuleCReadConfig {
20    pub wal_path: PathBuf,
21    pub expect_records: Option<u64>,
22}
23
24#[derive(Debug, Clone)]
25pub struct ModuleCWriteStats {
26    pub records_written: u64,
27    pub elapsed_ms: f64,
28    pub file_size_bytes: u64,
29    pub wal_path: PathBuf,
30}
31
32impl ModuleCWriteStats {
33    pub fn to_json(&self) -> String {
34        format!(
35            "{{\"module\":\"C_WRITE\",\"records_written\":{},\"elapsed_ms\":{:.3},\"file_size_bytes\":{},\"wal_path\":\"{}\"}}",
36            self.records_written,
37            self.elapsed_ms,
38            self.file_size_bytes,
39            self.wal_path.display()
40        )
41    }
42}
43
44#[derive(Debug, Clone)]
45pub struct ModuleCReadStats {
46    pub records_read: u64,
47    pub elapsed_ms: f64,
48    pub defaults_ok: bool,
49    pub zero_copy_ok: bool,
50    pub read_loop_allocations: u64,
51    pub wal_path: PathBuf,
52}
53
54impl ModuleCReadStats {
55    pub fn to_json(&self) -> String {
56        format!(
57            "{{\"module\":\"C_READ\",\"records_read\":{},\"elapsed_ms\":{:.3},\"defaults_ok\":{},\"zero_copy_ok\":{},\"read_loop_allocations\":{},\"wal_path\":\"{}\"}}",
58            self.records_read,
59            self.elapsed_ms,
60            self.defaults_ok,
61            self.zero_copy_ok,
62            self.read_loop_allocations,
63            self.wal_path.display()
64        )
65    }
66}
67
68pub fn run_writer(config: ModuleCWriteConfig) -> Result<ModuleCWriteStats> {
69    if config.records == 0 {
70        anyhow::bail!("records must be > 0");
71    }
72
73    if let Some(parent) = config.wal_path.parent() {
74        if !parent.as_os_str().is_empty() {
75            fs::create_dir_all(parent).with_context(|| {
76                format!("failed to create parent directory {}", parent.display())
77            })?;
78        }
79    }
80
81    let file = File::create(&config.wal_path)
82        .with_context(|| format!("failed to create WAL at {}", config.wal_path.display()))?;
83    let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file);
84
85    let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);
86    let bucket_names: Vec<String> = (0..1024).map(|idx| format!("bucket-{idx:04}")).collect();
87
88    let start = Instant::now();
89    for i in 0..config.records {
90        builder.reset();
91
92        let bucket_name = &bucket_names[(i as usize) % bucket_names.len()];
93        let bucket_name_offset = builder.create_string(bucket_name);
94        let args = v1_generated::RecordV1Args {
95            object_id: i + 1,
96            bucket_name: Some(bucket_name_offset),
97            offset: i * 4096,
98        };
99
100        let record = v1_generated::RecordV1::create(&mut builder, &args);
101        v1_generated::finish_size_prefixed_record_v1_buffer(&mut builder, record);
102        writer
103            .write_all(builder.finished_data())
104            .with_context(|| format!("failed writing record {}", i))?;
105    }
106
107    writer.flush().context("failed flushing WAL writer")?;
108
109    let elapsed_ms = start.elapsed().as_secs_f64() * 1_000.0;
110    let file_size_bytes = fs::metadata(&config.wal_path)
111        .with_context(|| format!("failed to stat WAL {}", config.wal_path.display()))?
112        .len();
113
114    Ok(ModuleCWriteStats {
115        records_written: config.records,
116        elapsed_ms,
117        file_size_bytes,
118        wal_path: config.wal_path,
119    })
120}
121
122pub fn run_reader(config: ModuleCReadConfig) -> Result<ModuleCReadStats> {
123    let file = File::open(&config.wal_path)
124        .with_context(|| format!("failed to open WAL {}", config.wal_path.display()))?;
125    let mapped = unsafe { MmapOptions::new().map(&file) }.context("failed to mmap WAL file")?;
126    if mapped.is_empty() {
127        anyhow::bail!("WAL is empty")
128    }
129
130    let start = Instant::now();
131    let alloc_before = alloc_counter::snapshot();
132
133    let mut cursor = 0usize;
134    let mut records_read = 0_u64;
135    let mut defaults_ok = true;
136    let mut zero_copy_ok = true;
137
138    let base_addr = mapped.as_ptr() as usize;
139    let end_addr = base_addr + mapped.len();
140
141    while cursor < mapped.len() {
142        if cursor + 4 > mapped.len() {
143            anyhow::bail!("truncated record size prefix at byte offset {}", cursor);
144        }
145
146        let size = u32::from_le_bytes(
147            mapped[cursor..cursor + 4]
148                .try_into()
149                .expect("prefix slice is always 4 bytes"),
150        ) as usize;
151        let total_size = 4 + size;
152        if cursor + total_size > mapped.len() {
153            anyhow::bail!("record overflows WAL bounds at byte offset {}", cursor);
154        }
155
156        let record_buf = &mapped[cursor..cursor + total_size];
157        let record = flatbuffers::size_prefixed_root::<v2_generated::RecordV2<'_>>(record_buf)
158            .map_err(|err| {
159                anyhow::anyhow!(
160                    "failed to parse size-prefixed record {} at offset {}: {}",
161                    records_read,
162                    cursor,
163                    err
164                )
165            })?;
166
167        if record.retain_until_date() != 0 {
168            defaults_ok = false;
169        }
170
171        let bucket_name = record
172            .bucket_name()
173            .ok_or_else(|| anyhow::anyhow!("record {} missing bucket_name", records_read))?;
174        let ptr = bucket_name.as_ptr() as usize;
175        let ptr_end = ptr + bucket_name.len();
176        if ptr < base_addr || ptr_end > end_addr {
177            zero_copy_ok = false;
178        }
179
180        records_read += 1;
181        cursor += total_size;
182    }
183
184    if let Some(expect_records) = config.expect_records {
185        if expect_records != records_read {
186            anyhow::bail!(
187                "record count mismatch: expected {}, got {}",
188                expect_records,
189                records_read
190            );
191        }
192    }
193
194    let elapsed_ms = start.elapsed().as_secs_f64() * 1_000.0;
195    let read_loop_allocations = alloc_counter::allocations_since(alloc_before);
196
197    Ok(ModuleCReadStats {
198        records_read,
199        elapsed_ms,
200        defaults_ok,
201        zero_copy_ok,
202        read_loop_allocations,
203        wal_path: config.wal_path,
204    })
205}
206
207#[cfg(test)]
208mod tests {
209    use std::path::PathBuf;
210    use std::time::{SystemTime, UNIX_EPOCH};
211
212    use super::*;
213
214    #[test]
215    fn round_trip_v1_writer_v2_reader() {
216        let wal_path = unique_wal_path("module-c-roundtrip");
217
218        let write_stats = run_writer(ModuleCWriteConfig {
219            records: 1_000,
220            wal_path: wal_path.clone(),
221        })
222        .expect("writer should succeed");
223
224        assert_eq!(write_stats.records_written, 1_000);
225        assert!(write_stats.file_size_bytes > 0);
226
227        let read_stats = run_reader(ModuleCReadConfig {
228            wal_path: wal_path.clone(),
229            expect_records: Some(1_000),
230        })
231        .expect("reader should succeed");
232
233        assert_eq!(read_stats.records_read, 1_000);
234        assert!(read_stats.defaults_ok);
235        assert!(read_stats.zero_copy_ok);
236
237        std::fs::remove_file(wal_path).expect("temporary WAL should be removable");
238    }
239
240    #[test]
241    #[ignore = "heavy million-record scan"]
242    fn million_record_round_trip() {
243        let wal_path = unique_wal_path("module-c-million");
244
245        run_writer(ModuleCWriteConfig {
246            records: 1_000_000,
247            wal_path: wal_path.clone(),
248        })
249        .expect("million-record writer should succeed");
250
251        let read_stats = run_reader(ModuleCReadConfig {
252            wal_path: wal_path.clone(),
253            expect_records: Some(1_000_000),
254        })
255        .expect("million-record reader should succeed");
256
257        assert!(read_stats.defaults_ok);
258        assert!(read_stats.zero_copy_ok);
259
260        std::fs::remove_file(wal_path).expect("temporary WAL should be removable");
261    }
262
263    fn unique_wal_path(prefix: &str) -> PathBuf {
264        let nanos = SystemTime::now()
265            .duration_since(UNIX_EPOCH)
266            .expect("system clock should be after epoch")
267            .as_nanos();
268        std::env::temp_dir().join(format!("{}-{}-{}.bin", prefix, std::process::id(), nanos))
269    }
270}