Skip to main content

stackforge_core/anonymize/
engine.rs

1//! Anonymization engine — session-scoped orchestrator.
2//!
3//! [`AnonymizationEngine`] holds the cryptographic state (Crypto-PAn
4//! instance, salted hasher, RNG) for a single anonymization session and
5//! exposes methods to anonymize individual flow fields or entire
6//! [`ConversationState`] batches.
7
8use rand::Rng;
9use rand::SeedableRng;
10use rand::rngs::StdRng;
11
12use super::crypto_pan::CryptoPan;
13use super::hash::SaltedHasher;
14use super::policy::{
15    AnonymizationPolicy, IpAnonymizationMode, PayloadAnonymizationMode,
16    PortAnonymizationMode, TcpSeqAnonymizationMode, TimestampAnonymizationMode,
17};
18use super::port::generalize_port;
19use super::timestamp::TimestampAnonymizer;
20use crate::flow::state::{ConversationState, ProtocolState};
21use crate::flow::tcp_reassembly::TcpReassembler;
22
23/// Session-scoped anonymization engine.
24///
25/// Holds all cryptographic state and caches for a single anonymization
26/// run. Create one engine per dataset export; reusing the same engine
27/// ensures consistent mappings across all flows.
28#[derive(Debug)]
29pub struct AnonymizationEngine {
30    policy: AnonymizationPolicy,
31    crypto_pan: Option<CryptoPan>,
32    hasher: SaltedHasher,
33    timestamp_anon: Option<TimestampAnonymizer>,
34    rng: StdRng,
35}
36
37impl AnonymizationEngine {
38    /// Create a new engine from the given policy.
39    ///
40    /// Any unspecified keys/salts in the policy are generated randomly.
41    #[must_use]
42    pub fn new(policy: AnonymizationPolicy) -> Self {
43        let mut rng = StdRng::from_os_rng();
44
45        // Initialize Crypto-PAn if needed
46        let crypto_pan = if policy.ip_mode == IpAnonymizationMode::CryptoPan {
47            let key = policy.crypto_pan_key.unwrap_or_else(|| {
48                let mut k = [0u8; 32];
49                rng.fill(&mut k);
50                k
51            });
52            Some(CryptoPan::new(&key))
53        } else {
54            None
55        };
56
57        // Initialize salted hasher
58        let salt = policy.hash_salt.unwrap_or_else(|| {
59            let mut s = [0u8; 32];
60            rng.fill(&mut s);
61            s
62        });
63        let hasher = SaltedHasher::new(salt);
64
65        // Initialize timestamp anonymizer
66        let timestamp_anon = match policy.timestamp_mode {
67            TimestampAnonymizationMode::None => None,
68            TimestampAnonymizationMode::EpochShift => {
69                Some(TimestampAnonymizer::epoch_shift_only(&mut rng))
70            },
71            TimestampAnonymizationMode::EpochShiftWithJitter { jitter_ms } => {
72                Some(TimestampAnonymizer::with_jitter(jitter_ms, &mut rng))
73            },
74        };
75
76        Self {
77            policy,
78            crypto_pan,
79            hasher,
80            timestamp_anon,
81            rng,
82        }
83    }
84
85    /// Anonymize a batch of conversations in place.
86    pub fn anonymize_conversations(&mut self, conversations: &mut [ConversationState]) {
87        for conv in conversations.iter_mut() {
88            self.anonymize_conversation(conv);
89        }
90    }
91
92    /// Anonymize a single conversation in place.
93    pub fn anonymize_conversation(&mut self, conv: &mut ConversationState) {
94        self.anonymize_ips(conv);
95        self.anonymize_ports(conv);
96        self.anonymize_timestamps(conv);
97        self.anonymize_tcp_seq(conv);
98        self.anonymize_payload(conv);
99    }
100
101    /// The underlying policy.
102    #[must_use]
103    pub fn policy(&self) -> &AnonymizationPolicy {
104        &self.policy
105    }
106
107    /// The salted hasher (for packet-level anonymization of MACs, etc.).
108    #[must_use]
109    pub fn hasher(&self) -> &SaltedHasher {
110        &self.hasher
111    }
112
113    // ---- private helpers ----
114
115    fn anonymize_ips(&mut self, conv: &mut ConversationState) {
116        if let Some(ref mut cp) = self.crypto_pan {
117            conv.key.addr_a = cp.anonymize_ip(conv.key.addr_a);
118            conv.key.addr_b = cp.anonymize_ip(conv.key.addr_b);
119        }
120    }
121
122    fn anonymize_ports(&self, conv: &mut ConversationState) {
123        match self.policy.port_mode {
124            PortAnonymizationMode::None => {},
125            PortAnonymizationMode::PreserveWellKnown => {
126                // Determine which side is the "destination" (lower port = server heuristic).
127                // In the canonical key, addr_a < addr_b. We use port_b as the
128                // "likely server" if it is in the well-known range.
129                let (is_a_dst, is_b_dst) = server_heuristic(conv.key.port_a, conv.key.port_b);
130                conv.key.port_a = generalize_port(conv.key.port_a, true, is_a_dst);
131                conv.key.port_b = generalize_port(conv.key.port_b, true, is_b_dst);
132            },
133            PortAnonymizationMode::Categorize => {
134                conv.key.port_a = generalize_port(conv.key.port_a, false, false);
135                conv.key.port_b = generalize_port(conv.key.port_b, false, false);
136            },
137        }
138    }
139
140    fn anonymize_timestamps(&mut self, conv: &mut ConversationState) {
141        if let Some(ref mut ts_anon) = self.timestamp_anon {
142            conv.start_time = ts_anon.anonymize(conv.start_time);
143            conv.last_seen = ts_anon.anonymize(conv.last_seen);
144            conv.forward.first_seen = ts_anon.anonymize(conv.forward.first_seen);
145            conv.forward.last_seen = ts_anon.anonymize(conv.forward.last_seen);
146            conv.reverse.first_seen = ts_anon.anonymize(conv.reverse.first_seen);
147            conv.reverse.last_seen = ts_anon.anonymize(conv.reverse.last_seen);
148        }
149    }
150
151    fn anonymize_tcp_seq(&mut self, conv: &mut ConversationState) {
152        if self.policy.tcp_seq_mode == TcpSeqAnonymizationMode::None {
153            return;
154        }
155        if let ProtocolState::Tcp(ref mut tcp) = conv.protocol_state {
156            // Generate per-flow random offsets for forward and reverse
157            let fwd_offset: u32 = self.rng.random();
158            let rev_offset: u32 = self.rng.random();
159
160            // Offset initial sequence numbers
161            tcp.forward_endpoint.initial_seq = tcp
162                .forward_endpoint
163                .initial_seq
164                .map(|s| s.wrapping_add(fwd_offset));
165            tcp.reverse_endpoint.initial_seq = tcp
166                .reverse_endpoint
167                .initial_seq
168                .map(|s| s.wrapping_add(rev_offset));
169
170            // Offset next_expected_seq and last_ack
171            tcp.forward_endpoint.next_expected_seq = tcp
172                .forward_endpoint
173                .next_expected_seq
174                .wrapping_add(fwd_offset);
175            tcp.forward_endpoint.last_ack = tcp
176                .forward_endpoint
177                .last_ack
178                .wrapping_add(rev_offset);
179
180            tcp.reverse_endpoint.next_expected_seq = tcp
181                .reverse_endpoint
182                .next_expected_seq
183                .wrapping_add(rev_offset);
184            tcp.reverse_endpoint.last_ack = tcp
185                .reverse_endpoint
186                .last_ack
187                .wrapping_add(fwd_offset);
188        }
189    }
190
191    fn anonymize_payload(&self, conv: &mut ConversationState) {
192        if let ProtocolState::Tcp(ref mut tcp) = conv.protocol_state {
193            match self.policy.payload_mode {
194                PayloadAnonymizationMode::None => {},
195                PayloadAnonymizationMode::TruncateAll => {
196                    tcp.reassembler_fwd = TcpReassembler::new();
197                    tcp.reassembler_rev = TcpReassembler::new();
198                },
199                PayloadAnonymizationMode::TruncateTo(n) => {
200                    tcp.reassembler_fwd.truncate_reassembled(n);
201                    tcp.reassembler_rev.truncate_reassembled(n);
202                },
203            }
204        }
205    }
206}
207
208/// Heuristic: the lower port is more likely the server/destination.
209///
210/// Returns `(is_port_a_dst, is_port_b_dst)`.
211fn server_heuristic(port_a: u16, port_b: u16) -> (bool, bool) {
212    if port_a <= 1023 && port_b > 1023 {
213        (true, false)
214    } else if port_b <= 1023 && port_a > 1023 {
215        (false, true)
216    } else if port_a < port_b {
217        // Both well-known or both high: treat lower as server
218        (true, false)
219    } else {
220        (false, true)
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use crate::flow::key::{CanonicalKey, TransportProtocol};
228    use std::net::{IpAddr, Ipv4Addr};
229    use std::time::Duration;
230
231    fn make_test_conv() -> ConversationState {
232        let (key, _) = CanonicalKey::new(
233            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)),
234            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
235            54321,
236            443,
237            TransportProtocol::Tcp,
238            None,
239        );
240        let mut conv = ConversationState::new(key, Duration::from_secs(100));
241        conv.last_seen = Duration::from_secs(200);
242        conv.forward.first_seen = Duration::from_secs(100);
243        conv.forward.last_seen = Duration::from_secs(150);
244        conv.reverse.first_seen = Duration::from_secs(101);
245        conv.reverse.last_seen = Duration::from_secs(200);
246        conv
247    }
248
249    #[test]
250    fn test_noop_policy() {
251        let mut conv = make_test_conv();
252        let orig_a = conv.key.addr_a;
253        let orig_b = conv.key.addr_b;
254        let orig_start = conv.start_time;
255
256        let mut engine = AnonymizationEngine::new(AnonymizationPolicy::default());
257        engine.anonymize_conversation(&mut conv);
258
259        assert_eq!(conv.key.addr_a, orig_a);
260        assert_eq!(conv.key.addr_b, orig_b);
261        assert_eq!(conv.start_time, orig_start);
262    }
263
264    #[test]
265    fn test_crypto_pan_changes_ips() {
266        let mut conv = make_test_conv();
267        let orig_a = conv.key.addr_a;
268        let orig_b = conv.key.addr_b;
269
270        let mut policy = AnonymizationPolicy::default();
271        policy.ip_mode = IpAnonymizationMode::CryptoPan;
272
273        let mut engine = AnonymizationEngine::new(policy);
274        engine.anonymize_conversation(&mut conv);
275
276        assert_ne!(conv.key.addr_a, orig_a);
277        assert_ne!(conv.key.addr_b, orig_b);
278    }
279
280    #[test]
281    fn test_port_preserve_well_known() {
282        let mut conv = make_test_conv();
283
284        let mut policy = AnonymizationPolicy::default();
285        policy.port_mode = PortAnonymizationMode::PreserveWellKnown;
286
287        let mut engine = AnonymizationEngine::new(policy);
288        engine.anonymize_conversation(&mut conv);
289
290        // The well-known port (443) should be preserved on the dst side
291        // The ephemeral port (54321) should be generalized
292        let has_443 = conv.key.port_a == 443 || conv.key.port_b == 443;
293        assert!(has_443, "Well-known port 443 should be preserved");
294    }
295
296    #[test]
297    fn test_timestamp_shift() {
298        let mut conv = make_test_conv();
299        let orig_start = conv.start_time;
300
301        let mut policy = AnonymizationPolicy::default();
302        policy.timestamp_mode = TimestampAnonymizationMode::EpochShift;
303
304        let mut engine = AnonymizationEngine::new(policy);
305        engine.anonymize_conversation(&mut conv);
306
307        assert!(conv.start_time > orig_start);
308        // Offset should be at least 30 days
309        assert!(conv.start_time - orig_start >= Duration::from_secs(30 * 86400));
310    }
311
312    #[test]
313    fn test_payload_truncate_all() {
314        let mut conv = make_test_conv();
315
316        let mut policy = AnonymizationPolicy::default();
317        policy.payload_mode = PayloadAnonymizationMode::TruncateAll;
318
319        let mut engine = AnonymizationEngine::new(policy);
320        engine.anonymize_conversation(&mut conv);
321
322        if let ProtocolState::Tcp(ref tcp) = conv.protocol_state {
323            assert_eq!(tcp.reassembler_fwd.reassembled_len(), 0);
324            assert_eq!(tcp.reassembler_rev.reassembled_len(), 0);
325        }
326    }
327
328    #[test]
329    fn test_ml_optimized_full_pipeline() {
330        let mut conv = make_test_conv();
331        let orig_a = conv.key.addr_a;
332
333        let mut engine = AnonymizationEngine::new(AnonymizationPolicy::ml_optimized());
334        engine.anonymize_conversation(&mut conv);
335
336        // IPs changed
337        assert_ne!(conv.key.addr_a, orig_a);
338        // Timestamps shifted
339        assert!(conv.start_time > Duration::from_secs(100));
340    }
341
342    #[test]
343    fn test_batch_anonymization() {
344        let mut convs = vec![make_test_conv(), make_test_conv()];
345        let orig_a0 = convs[0].key.addr_a;
346
347        let mut engine = AnonymizationEngine::new(AnonymizationPolicy::ml_optimized());
348        engine.anonymize_conversations(&mut convs);
349
350        // Both changed
351        assert_ne!(convs[0].key.addr_a, orig_a0);
352        // Same original IP should map to same anonymized IP
353        assert_eq!(convs[0].key.addr_a, convs[1].key.addr_a);
354    }
355
356    #[test]
357    fn test_server_heuristic() {
358        // Well-known vs ephemeral
359        assert_eq!(server_heuristic(443, 54321), (true, false));
360        assert_eq!(server_heuristic(54321, 80), (false, true));
361        // Both high: lower = server
362        assert_eq!(server_heuristic(8080, 54321), (true, false));
363    }
364}