Skip to main content

nexus_core/
module_a.rs

1use std::time::Instant;
2
3use anyhow::{Context, Result};
4use ring::aead::{self, Aad, LessSafeKey, Nonce, UnboundKey};
5use zstd::bulk::Compressor;
6
7use crate::alloc_counter;
8
9#[derive(Debug, Clone)]
10pub struct ModuleAConfig {
11    pub payload_bytes: usize,
12    pub chunk_bytes: usize,
13}
14
15#[derive(Debug, Clone)]
16pub struct ModuleAStats {
17    pub payload_bytes: usize,
18    pub chunk_bytes: usize,
19    pub chunks: usize,
20    pub elapsed_ms: f64,
21    pub throughput_mb_s: f64,
22    pub total_output_bytes: usize,
23    pub hot_loop_allocations: u64,
24    pub crc_fold: u32,
25}
26
27impl ModuleAStats {
28    pub fn to_json(&self) -> String {
29        format!(
30            "{{\"module\":\"A\",\"payload_bytes\":{},\"chunk_bytes\":{},\"chunks\":{},\"elapsed_ms\":{:.3},\"throughput_mb_s\":{:.3},\"total_output_bytes\":{},\"hot_loop_allocations\":{},\"crc_fold\":{}}}",
31            self.payload_bytes,
32            self.chunk_bytes,
33            self.chunks,
34            self.elapsed_ms,
35            self.throughput_mb_s,
36            self.total_output_bytes,
37            self.hot_loop_allocations,
38            self.crc_fold
39        )
40    }
41}
42
43pub fn run(config: ModuleAConfig) -> Result<ModuleAStats> {
44    if config.chunk_bytes == 0 {
45        anyhow::bail!("chunk_bytes must be > 0");
46    }
47    if config.payload_bytes == 0 {
48        anyhow::bail!("payload_bytes must be > 0");
49    }
50    if config.payload_bytes % config.chunk_bytes != 0 {
51        anyhow::bail!("payload_bytes must be divisible by chunk_bytes");
52    }
53
54    let chunks = config.payload_bytes / config.chunk_bytes;
55    let mut payload = vec![0_u8; config.payload_bytes];
56    fill_semi_compressible(&mut payload);
57
58    let chunk_output_bound = zstd::zstd_safe::compress_bound(config.chunk_bytes)
59        .saturating_add(aead::AES_256_GCM.tag_len());
60    let sink_capacity = chunk_output_bound
61        .checked_mul(chunks)
62        .context("sink capacity overflow")?;
63
64    let mut compressed_buffer = vec![0_u8; zstd::zstd_safe::compress_bound(config.chunk_bytes)];
65    let mut blackhole = vec![0_u8; sink_capacity];
66    let mut compressor = Compressor::new(1).context("failed to initialize zstd compressor")?;
67
68    let key_bytes = [0x42_u8; 32];
69    let unbound = UnboundKey::new(&aead::AES_256_GCM, &key_bytes)
70        .map_err(|_| anyhow::anyhow!("failed to initialize AES-256-GCM key"))?;
71    let key = LessSafeKey::new(unbound);
72
73    // Warm-up once to avoid allocator noise from lazy initialization.
74    let warm_chunk = &payload[..config.chunk_bytes];
75    let warm_compressed = compressor
76        .compress_to_buffer(warm_chunk, &mut compressed_buffer[..])
77        .context("warm-up zstd compression failed")?;
78    let warm_nonce = nonce_for_chunk(0);
79    let warm_tag = key
80        .seal_in_place_separate_tag(
81            warm_nonce,
82            Aad::empty(),
83            &mut compressed_buffer[..warm_compressed],
84        )
85        .map_err(|_| anyhow::anyhow!("warm-up AES-256-GCM encryption failed"))?;
86    let warm_total = warm_compressed + warm_tag.as_ref().len();
87    blackhole[..warm_compressed].copy_from_slice(&compressed_buffer[..warm_compressed]);
88    blackhole[warm_compressed..warm_total].copy_from_slice(warm_tag.as_ref());
89
90    let alloc_before = alloc_counter::snapshot();
91    let start = Instant::now();
92    let mut sink_offset = 0usize;
93    let mut crc_fold = 0_u32;
94
95    for idx in 0..chunks {
96        let start_idx = idx * config.chunk_bytes;
97        let end_idx = start_idx + config.chunk_bytes;
98        let chunk = &payload[start_idx..end_idx];
99
100        let crc = crc32c::crc32c(chunk);
101        crc_fold ^= crc;
102
103        let compressed_len = compressor
104            .compress_to_buffer(chunk, &mut compressed_buffer[..])
105            .with_context(|| format!("zstd compression failed at chunk {}", idx))?;
106
107        let nonce = nonce_for_chunk(idx as u64 + 1);
108        let tag = key
109            .seal_in_place_separate_tag(
110                nonce,
111                Aad::from(crc.to_le_bytes()),
112                &mut compressed_buffer[..compressed_len],
113            )
114            .map_err(|_| anyhow::anyhow!("AES-256-GCM encryption failed at chunk {}", idx))?;
115
116        let encrypted_len = compressed_len + tag.as_ref().len();
117        let end_offset = sink_offset + encrypted_len;
118        if end_offset > blackhole.len() {
119            anyhow::bail!("blackhole buffer overflow");
120        }
121
122        blackhole[sink_offset..sink_offset + compressed_len]
123            .copy_from_slice(&compressed_buffer[..compressed_len]);
124        blackhole[sink_offset + compressed_len..end_offset].copy_from_slice(tag.as_ref());
125        sink_offset = end_offset;
126    }
127
128    let elapsed = start.elapsed();
129    let hot_loop_allocations = alloc_counter::allocations_since(alloc_before);
130    let elapsed_ms = elapsed.as_secs_f64() * 1_000.0;
131    let throughput_mb_s = (config.payload_bytes as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64();
132
133    Ok(ModuleAStats {
134        payload_bytes: config.payload_bytes,
135        chunk_bytes: config.chunk_bytes,
136        chunks,
137        elapsed_ms,
138        throughput_mb_s,
139        total_output_bytes: sink_offset,
140        hot_loop_allocations,
141        crc_fold,
142    })
143}
144
145fn fill_semi_compressible(buffer: &mut [u8]) {
146    let pattern = b"TRACER_BULLET_PATTERN_";
147    for (idx, byte) in buffer.iter_mut().enumerate() {
148        if idx % 64 < pattern.len() {
149            *byte = pattern[idx % pattern.len()];
150        } else {
151            let mixed = (idx as u64)
152                .wrapping_mul(6364136223846793005_u64)
153                .wrapping_add(1);
154            *byte = (mixed >> 32) as u8;
155        }
156    }
157}
158
159fn nonce_for_chunk(chunk: u64) -> Nonce {
160    let mut nonce = [0_u8; 12];
161    nonce[4..].copy_from_slice(&chunk.to_be_bytes());
162    Nonce::assume_unique_for_key(nonce)
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    #[test]
170    fn rejects_non_divisible_payload() {
171        let err = run(ModuleAConfig {
172            payload_bytes: 17,
173            chunk_bytes: 8,
174        })
175        .expect_err("expected configuration validation failure");
176        assert!(err.to_string().contains("divisible"));
177    }
178
179    #[test]
180    fn processes_small_payload() {
181        let stats = run(ModuleAConfig {
182            payload_bytes: 8 * 1024 * 1024,
183            chunk_bytes: 1024 * 1024,
184        })
185        .expect("module-a small payload run should succeed");
186        assert_eq!(stats.chunks, 8);
187        assert!(stats.total_output_bytes > 0);
188        assert!(stats.elapsed_ms >= 0.0);
189        assert!(stats.throughput_mb_s > 0.0);
190    }
191
192    #[test]
193    fn rejects_zero_chunk() {
194        let err = run(ModuleAConfig {
195            payload_bytes: 1024,
196            chunk_bytes: 0,
197        })
198        .expect_err("expected zero chunk rejection");
199        assert!(err.to_string().contains("chunk_bytes"));
200    }
201
202    #[test]
203    fn json_output_contains_module_label() {
204        let stats = run(ModuleAConfig {
205            payload_bytes: 1024 * 1024,
206            chunk_bytes: 256 * 1024,
207        })
208        .expect("module-a run should succeed");
209        let json = stats.to_json();
210        assert!(json.contains("\"module\":\"A\""));
211        assert!(json.contains("\"chunks\":4"));
212    }
213}