1use std::fs::{self, File};
2use std::io::{BufWriter, Write};
3use std::path::PathBuf;
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use memmap2::MmapOptions;
8
9use crate::alloc_counter;
10use crate::generated::{v1_generated, v2_generated};
11
12#[derive(Debug, Clone)]
13pub struct ModuleCWriteConfig {
14 pub records: u64,
15 pub wal_path: PathBuf,
16}
17
18#[derive(Debug, Clone)]
19pub struct ModuleCReadConfig {
20 pub wal_path: PathBuf,
21 pub expect_records: Option<u64>,
22}
23
24#[derive(Debug, Clone)]
25pub struct ModuleCWriteStats {
26 pub records_written: u64,
27 pub elapsed_ms: f64,
28 pub file_size_bytes: u64,
29 pub wal_path: PathBuf,
30}
31
32impl ModuleCWriteStats {
33 pub fn to_json(&self) -> String {
34 format!(
35 "{{\"module\":\"C_WRITE\",\"records_written\":{},\"elapsed_ms\":{:.3},\"file_size_bytes\":{},\"wal_path\":\"{}\"}}",
36 self.records_written,
37 self.elapsed_ms,
38 self.file_size_bytes,
39 self.wal_path.display()
40 )
41 }
42}
43
44#[derive(Debug, Clone)]
45pub struct ModuleCReadStats {
46 pub records_read: u64,
47 pub elapsed_ms: f64,
48 pub defaults_ok: bool,
49 pub zero_copy_ok: bool,
50 pub read_loop_allocations: u64,
51 pub wal_path: PathBuf,
52}
53
54impl ModuleCReadStats {
55 pub fn to_json(&self) -> String {
56 format!(
57 "{{\"module\":\"C_READ\",\"records_read\":{},\"elapsed_ms\":{:.3},\"defaults_ok\":{},\"zero_copy_ok\":{},\"read_loop_allocations\":{},\"wal_path\":\"{}\"}}",
58 self.records_read,
59 self.elapsed_ms,
60 self.defaults_ok,
61 self.zero_copy_ok,
62 self.read_loop_allocations,
63 self.wal_path.display()
64 )
65 }
66}
67
68pub fn run_writer(config: ModuleCWriteConfig) -> Result<ModuleCWriteStats> {
69 if config.records == 0 {
70 anyhow::bail!("records must be > 0");
71 }
72
73 if let Some(parent) = config.wal_path.parent() {
74 if !parent.as_os_str().is_empty() {
75 fs::create_dir_all(parent).with_context(|| {
76 format!("failed to create parent directory {}", parent.display())
77 })?;
78 }
79 }
80
81 let file = File::create(&config.wal_path)
82 .with_context(|| format!("failed to create WAL at {}", config.wal_path.display()))?;
83 let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file);
84
85 let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);
86 let bucket_names: Vec<String> = (0..1024).map(|idx| format!("bucket-{idx:04}")).collect();
87
88 let start = Instant::now();
89 for i in 0..config.records {
90 builder.reset();
91
92 let bucket_name = &bucket_names[(i as usize) % bucket_names.len()];
93 let bucket_name_offset = builder.create_string(bucket_name);
94 let args = v1_generated::RecordV1Args {
95 object_id: i + 1,
96 bucket_name: Some(bucket_name_offset),
97 offset: i * 4096,
98 };
99
100 let record = v1_generated::RecordV1::create(&mut builder, &args);
101 v1_generated::finish_size_prefixed_record_v1_buffer(&mut builder, record);
102 writer
103 .write_all(builder.finished_data())
104 .with_context(|| format!("failed writing record {}", i))?;
105 }
106
107 writer.flush().context("failed flushing WAL writer")?;
108
109 let elapsed_ms = start.elapsed().as_secs_f64() * 1_000.0;
110 let file_size_bytes = fs::metadata(&config.wal_path)
111 .with_context(|| format!("failed to stat WAL {}", config.wal_path.display()))?
112 .len();
113
114 Ok(ModuleCWriteStats {
115 records_written: config.records,
116 elapsed_ms,
117 file_size_bytes,
118 wal_path: config.wal_path,
119 })
120}
121
122pub fn run_reader(config: ModuleCReadConfig) -> Result<ModuleCReadStats> {
123 let file = File::open(&config.wal_path)
124 .with_context(|| format!("failed to open WAL {}", config.wal_path.display()))?;
125 let mapped = unsafe { MmapOptions::new().map(&file) }.context("failed to mmap WAL file")?;
126 if mapped.is_empty() {
127 anyhow::bail!("WAL is empty")
128 }
129
130 let start = Instant::now();
131 let alloc_before = alloc_counter::snapshot();
132
133 let mut cursor = 0usize;
134 let mut records_read = 0_u64;
135 let mut defaults_ok = true;
136 let mut zero_copy_ok = true;
137
138 let base_addr = mapped.as_ptr() as usize;
139 let end_addr = base_addr + mapped.len();
140
141 while cursor < mapped.len() {
142 if cursor + 4 > mapped.len() {
143 anyhow::bail!("truncated record size prefix at byte offset {}", cursor);
144 }
145
146 let size = u32::from_le_bytes(
147 mapped[cursor..cursor + 4]
148 .try_into()
149 .expect("prefix slice is always 4 bytes"),
150 ) as usize;
151 let total_size = 4 + size;
152 if cursor + total_size > mapped.len() {
153 anyhow::bail!("record overflows WAL bounds at byte offset {}", cursor);
154 }
155
156 let record_buf = &mapped[cursor..cursor + total_size];
157 let record = flatbuffers::size_prefixed_root::<v2_generated::RecordV2<'_>>(record_buf)
158 .map_err(|err| {
159 anyhow::anyhow!(
160 "failed to parse size-prefixed record {} at offset {}: {}",
161 records_read,
162 cursor,
163 err
164 )
165 })?;
166
167 if record.retain_until_date() != 0 {
168 defaults_ok = false;
169 }
170
171 let bucket_name = record
172 .bucket_name()
173 .ok_or_else(|| anyhow::anyhow!("record {} missing bucket_name", records_read))?;
174 let ptr = bucket_name.as_ptr() as usize;
175 let ptr_end = ptr + bucket_name.len();
176 if ptr < base_addr || ptr_end > end_addr {
177 zero_copy_ok = false;
178 }
179
180 records_read += 1;
181 cursor += total_size;
182 }
183
184 if let Some(expect_records) = config.expect_records {
185 if expect_records != records_read {
186 anyhow::bail!(
187 "record count mismatch: expected {}, got {}",
188 expect_records,
189 records_read
190 );
191 }
192 }
193
194 let elapsed_ms = start.elapsed().as_secs_f64() * 1_000.0;
195 let read_loop_allocations = alloc_counter::allocations_since(alloc_before);
196
197 Ok(ModuleCReadStats {
198 records_read,
199 elapsed_ms,
200 defaults_ok,
201 zero_copy_ok,
202 read_loop_allocations,
203 wal_path: config.wal_path,
204 })
205}
206
207#[cfg(test)]
208mod tests {
209 use std::path::PathBuf;
210 use std::time::{SystemTime, UNIX_EPOCH};
211
212 use super::*;
213
214 #[test]
215 fn round_trip_v1_writer_v2_reader() {
216 let wal_path = unique_wal_path("module-c-roundtrip");
217
218 let write_stats = run_writer(ModuleCWriteConfig {
219 records: 1_000,
220 wal_path: wal_path.clone(),
221 })
222 .expect("writer should succeed");
223
224 assert_eq!(write_stats.records_written, 1_000);
225 assert!(write_stats.file_size_bytes > 0);
226
227 let read_stats = run_reader(ModuleCReadConfig {
228 wal_path: wal_path.clone(),
229 expect_records: Some(1_000),
230 })
231 .expect("reader should succeed");
232
233 assert_eq!(read_stats.records_read, 1_000);
234 assert!(read_stats.defaults_ok);
235 assert!(read_stats.zero_copy_ok);
236
237 std::fs::remove_file(wal_path).expect("temporary WAL should be removable");
238 }
239
240 #[test]
241 #[ignore = "heavy million-record scan"]
242 fn million_record_round_trip() {
243 let wal_path = unique_wal_path("module-c-million");
244
245 run_writer(ModuleCWriteConfig {
246 records: 1_000_000,
247 wal_path: wal_path.clone(),
248 })
249 .expect("million-record writer should succeed");
250
251 let read_stats = run_reader(ModuleCReadConfig {
252 wal_path: wal_path.clone(),
253 expect_records: Some(1_000_000),
254 })
255 .expect("million-record reader should succeed");
256
257 assert!(read_stats.defaults_ok);
258 assert!(read_stats.zero_copy_ok);
259
260 std::fs::remove_file(wal_path).expect("temporary WAL should be removable");
261 }
262
263 fn unique_wal_path(prefix: &str) -> PathBuf {
264 let nanos = SystemTime::now()
265 .duration_since(UNIX_EPOCH)
266 .expect("system clock should be after epoch")
267 .as_nanos();
268 std::env::temp_dir().join(format!("{}-{}-{}.bin", prefix, std::process::id(), nanos))
269 }
270}