arcbox-net 0.1.4

High-performance network stack for ArcBox
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
//! macOS NAT network implementation.
//!
//! This module provides NAT networking for VMs on macOS using the
//! high-performance datapath components.

use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::sync::Arc;

use crate::datapath::{
    DEFAULT_POOL_CAPACITY, DEFAULT_RING_CAPACITY, DatapathStats, LockFreeRing, PacketPool,
};
use crate::error::{NetError, Result};
use crate::nat::{IpAllocator, NatConfig};
use crate::nat_engine::{NatDirection, NatEngine, NatEngineConfig, NatResult};

use super::{DarwinNetEndpoint, DarwinNetMode, generate_mac};

/// macOS NAT network.
///
/// Provides NAT networking for VMs using a high-performance datapath
/// with zero-copy packet handling and lock-free queues.
pub struct DarwinNatNetwork {
    /// Network configuration.
    config: NatConfig,
    /// NAT engine for packet translation.
    nat_engine: NatEngine,
    /// Packet buffer pool.
    packet_pool: Arc<PacketPool>,
    /// TX ring (Guest -> Host).
    tx_ring: Arc<LockFreeRing<u32>>,
    /// RX ring (Host -> Guest).
    rx_ring: Arc<LockFreeRing<u32>>,
    /// IP allocator.
    ip_allocator: IpAllocator,
    /// Connected endpoints.
    endpoints: HashMap<String, DarwinNetEndpoint>,
    /// Performance statistics.
    stats: DatapathStats,
    /// Running state.
    running: bool,
}

impl DarwinNatNetwork {
    /// Creates a new Darwin NAT network.
    ///
    /// # Errors
    ///
    /// Returns an error if resource allocation fails.
    pub fn new(config: NatConfig) -> Result<Self> {
        let nat_config = NatEngineConfig::new(config.gateway)
            .with_port_range(49152, 65535)
            .with_timeout(300);

        let packet_pool = Arc::new(PacketPool::new(DEFAULT_POOL_CAPACITY)?);
        let tx_ring = Arc::new(LockFreeRing::new(DEFAULT_RING_CAPACITY));
        let rx_ring = Arc::new(LockFreeRing::new(DEFAULT_RING_CAPACITY));

        // Calculate DHCP range
        let mut ip_allocator = IpAllocator::new(config.dhcp_start, config.dhcp_end);

        // Reserve gateway IP
        ip_allocator.allocate_specific(config.gateway);

        Ok(Self {
            config,
            nat_engine: NatEngine::new(&nat_config),
            packet_pool,
            tx_ring,
            rx_ring,
            ip_allocator,
            endpoints: HashMap::new(),
            stats: DatapathStats::new(),
            running: false,
        })
    }

    /// Creates a NAT network with default configuration.
    ///
    /// # Errors
    ///
    /// Returns an error if resource allocation fails.
    pub fn with_defaults() -> Result<Self> {
        Self::new(NatConfig::default())
    }

    /// Returns the network configuration.
    #[must_use]
    pub fn config(&self) -> &NatConfig {
        &self.config
    }

    /// Returns the gateway IP address.
    #[must_use]
    pub fn gateway(&self) -> Ipv4Addr {
        self.config.gateway
    }

    /// Creates a new endpoint for a VM.
    ///
    /// # Errors
    ///
    /// Returns an error if IP allocation fails or endpoint already exists.
    pub fn create_endpoint(&mut self, id: &str) -> Result<&DarwinNetEndpoint> {
        if self.endpoints.contains_key(id) {
            return Err(NetError::config(format!(
                "endpoint '{}' already exists",
                id
            )));
        }

        let ip = self
            .ip_allocator
            .allocate()
            .ok_or_else(|| NetError::AddressAllocation("no available IP addresses".to_string()))?;

        let mac = generate_mac();
        let mut endpoint = DarwinNetEndpoint::new(id.to_string(), mac, DarwinNetMode::Nat);
        endpoint.set_ip(ip);

        self.endpoints.insert(id.to_string(), endpoint);

        Ok(self.endpoints.get(id).unwrap())
    }

    /// Removes an endpoint.
    ///
    /// # Errors
    ///
    /// Returns an error if the endpoint doesn't exist.
    pub fn remove_endpoint(&mut self, id: &str) -> Result<()> {
        let endpoint = self
            .endpoints
            .remove(id)
            .ok_or_else(|| NetError::config(format!("endpoint '{}' not found", id)))?;

        if let Some(ip) = endpoint.ip {
            self.ip_allocator.release(ip);
        }

        Ok(())
    }

    /// Returns an endpoint by ID.
    #[must_use]
    pub fn get_endpoint(&self, id: &str) -> Option<&DarwinNetEndpoint> {
        self.endpoints.get(id)
    }

    /// Returns all endpoints.
    #[must_use = "returns an iterator over endpoints without modifying the NAT engine"]
    pub fn endpoints(&self) -> impl Iterator<Item = &DarwinNetEndpoint> {
        self.endpoints.values()
    }

    /// Returns the number of connected endpoints.
    #[must_use]
    pub fn endpoint_count(&self) -> usize {
        self.endpoints.len()
    }

    /// Starts the NAT network.
    ///
    /// # Errors
    ///
    /// Returns an error if the network is already running.
    pub fn start(&mut self) -> Result<()> {
        if self.running {
            return Err(NetError::config("network already running".to_string()));
        }

        // Set internal network in NAT engine
        self.nat_engine
            .set_internal_network(self.config.subnet, self.config.prefix_len);

        self.running = true;
        Ok(())
    }

    /// Stops the NAT network.
    pub fn stop(&mut self) {
        self.running = false;
    }

    /// Returns true if the network is running.
    #[must_use]
    pub fn is_running(&self) -> bool {
        self.running
    }

    /// Processes a packet from a guest (TX direction).
    ///
    /// # Safety
    ///
    /// The packet data must be valid.
    ///
    /// # Errors
    ///
    /// Returns an error if translation fails.
    pub unsafe fn process_tx_packet(&mut self, packet: &mut [u8]) -> Result<NatResult> {
        self.stats.record_tx(1, packet.len() as u64);

        // Safety: packet data is valid per function contract
        match unsafe { self.nat_engine.translate(packet, NatDirection::Outbound) } {
            Ok(result) => {
                if result == NatResult::Translated {
                    self.stats.record_nat_translation(true);
                }
                Ok(result)
            }
            Err(e) => {
                self.stats.record_tx_error();
                Err(NetError::Nat(e.to_string()))
            }
        }
    }

    /// Processes a packet to a guest (RX direction).
    ///
    /// # Safety
    ///
    /// The packet data must be valid.
    ///
    /// # Errors
    ///
    /// Returns an error if translation fails.
    pub unsafe fn process_rx_packet(&mut self, packet: &mut [u8]) -> Result<NatResult> {
        self.stats.record_rx(1, packet.len() as u64);

        // Safety: packet data is valid per function contract
        match unsafe { self.nat_engine.translate(packet, NatDirection::Inbound) } {
            Ok(result) => {
                if result == NatResult::Translated {
                    self.stats.record_nat_translation(false);
                }
                Ok(result)
            }
            Err(e) => {
                self.stats.record_rx_error();
                Err(NetError::Nat(e.to_string()))
            }
        }
    }

    /// Returns the packet pool.
    #[must_use]
    pub fn packet_pool(&self) -> &Arc<PacketPool> {
        &self.packet_pool
    }

    /// Returns the TX ring.
    #[must_use]
    pub fn tx_ring(&self) -> &Arc<LockFreeRing<u32>> {
        &self.tx_ring
    }

    /// Returns the RX ring.
    #[must_use]
    pub fn rx_ring(&self) -> &Arc<LockFreeRing<u32>> {
        &self.rx_ring
    }

    /// Returns performance statistics.
    #[must_use]
    pub fn stats(&self) -> &DatapathStats {
        &self.stats
    }

    /// Returns NAT engine statistics.
    #[must_use]
    pub fn nat_stats(&self) -> &crate::nat_engine::conntrack::ConnTrackStats {
        self.nat_engine.conntrack().stats()
    }

    /// Expires old NAT connections.
    pub fn expire_connections(&mut self) -> usize {
        let count = self.nat_engine.expire_connections();
        for _ in 0..count {
            self.stats.record_nat_connection_expired();
        }
        count
    }

    /// Returns the number of active NAT connections.
    #[must_use]
    pub fn connection_count(&self) -> usize {
        self.nat_engine.connection_count()
    }

    /// Performs periodic maintenance.
    pub fn maintenance(&mut self) {
        // Expire old connections every call
        self.expire_connections();
    }
}

impl std::fmt::Debug for DarwinNatNetwork {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("DarwinNatNetwork")
            .field("gateway", &self.config.gateway)
            .field("endpoints", &self.endpoints.len())
            .field("connections", &self.nat_engine.connection_count())
            .field("running", &self.running)
            .finish()
    }
}

impl Drop for DarwinNatNetwork {
    fn drop(&mut self) {
        self.stop();
    }
}

/// High-performance datapath poller.
///
/// This structure manages the polling loop for the network datapath,
/// processing packets in batches for optimal performance.
pub struct DatapathPoller {
    /// Batch size for packet processing.
    batch_size: usize,
    /// Polling strategy.
    strategy: PollingStrategy,
    /// Spin count for adaptive polling.
    spin_count: u32,
    /// Maximum spin count before yielding.
    max_spins: u32,
}

/// Polling strategy for the datapath.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PollingStrategy {
    /// Always poll (lowest latency, highest CPU).
    Busy,
    /// Poll with adaptive backoff.
    #[default]
    Adaptive,
    /// Event-driven (lowest CPU, higher latency).
    EventDriven,
}

impl Default for DatapathPoller {
    fn default() -> Self {
        Self::new()
    }
}

impl DatapathPoller {
    /// Creates a new datapath poller.
    #[must_use]
    pub fn new() -> Self {
        Self {
            batch_size: 64,
            strategy: PollingStrategy::Adaptive,
            spin_count: 0,
            max_spins: 1000,
        }
    }

    /// Sets the batch size.
    #[must_use]
    pub fn with_batch_size(mut self, size: usize) -> Self {
        self.batch_size = size;
        self
    }

    /// Sets the polling strategy.
    #[must_use]
    pub fn with_strategy(mut self, strategy: PollingStrategy) -> Self {
        self.strategy = strategy;
        self
    }

    /// Polls the datapath once.
    ///
    /// Returns true if any work was done.
    pub fn poll_once(&mut self, network: &mut DarwinNatNetwork) -> bool {
        let mut work_done = false;

        // Process TX packets (Guest -> Host)
        let mut tx_batch = [0u32; 64];
        let tx_count = network
            .tx_ring()
            .dequeue_batch(&mut tx_batch[..self.batch_size]);
        if tx_count > 0 {
            work_done = true;
            network.stats().record_batch_size(tx_count);

            for &buf_idx in &tx_batch[..tx_count] {
                // Get buffer pointer and length first to avoid borrow conflicts.
                // Safety: buf_idx comes from our ring buffer and is valid.
                let (ptr, len) = unsafe {
                    let buffer = network.packet_pool().get_mut(buf_idx);
                    (buffer.as_mut_ptr(), buffer.len())
                };

                // Process packet with raw pointer access.
                // Safety: ptr is valid and exclusively owned during this operation.
                unsafe {
                    let slice = std::slice::from_raw_parts_mut(ptr, len);
                    let _ = network.process_tx_packet(slice);
                    network.packet_pool().free_by_index(buf_idx);
                }
            }
        }

        // Process RX packets (Host -> Guest)
        let mut rx_batch = [0u32; 64];
        let rx_count = network
            .rx_ring()
            .dequeue_batch(&mut rx_batch[..self.batch_size]);
        if rx_count > 0 {
            work_done = true;
            network.stats().record_batch_size(rx_count);

            for &buf_idx in &rx_batch[..rx_count] {
                // Get buffer pointer and length first to avoid borrow conflicts.
                // Safety: buf_idx comes from our ring buffer and is valid.
                let (ptr, len) = unsafe {
                    let buffer = network.packet_pool().get_mut(buf_idx);
                    (buffer.as_mut_ptr(), buffer.len())
                };

                // Process packet with raw pointer access.
                // Safety: ptr is valid and exclusively owned during this operation.
                unsafe {
                    let slice = std::slice::from_raw_parts_mut(ptr, len);
                    let _ = network.process_rx_packet(slice);
                    network.packet_pool().free_by_index(buf_idx);
                }
            }
        }

        // Update polling state
        network.stats().record_poll(work_done);

        if work_done {
            self.spin_count = 0;
        } else {
            self.spin_count += 1;

            // Adaptive backoff
            if self.strategy == PollingStrategy::Adaptive && self.spin_count > self.max_spins {
                std::thread::yield_now();
                self.spin_count = 0;
            }
        }

        work_done
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_darwin_nat_network_creation() {
        let network = DarwinNatNetwork::with_defaults().unwrap();
        assert!(!network.is_running());
        assert_eq!(network.endpoint_count(), 0);
        assert_eq!(network.gateway(), Ipv4Addr::new(192, 168, 64, 1));
    }

    #[test]
    fn test_endpoint_lifecycle() {
        let mut network = DarwinNatNetwork::with_defaults().unwrap();

        // Create endpoint
        let endpoint = network.create_endpoint("vm1").unwrap();
        assert!(endpoint.ip.is_some());
        assert_eq!(endpoint.mode, DarwinNetMode::Nat);

        // Should have one endpoint
        assert_eq!(network.endpoint_count(), 1);

        // Create another
        let _ = network.create_endpoint("vm2").unwrap();
        assert_eq!(network.endpoint_count(), 2);

        // Remove first
        network.remove_endpoint("vm1").unwrap();
        assert_eq!(network.endpoint_count(), 1);

        // Can't remove non-existent
        assert!(network.remove_endpoint("vm1").is_err());
    }

    #[test]
    fn test_start_stop() {
        let mut network = DarwinNatNetwork::with_defaults().unwrap();

        assert!(!network.is_running());

        network.start().unwrap();
        assert!(network.is_running());

        // Can't start twice
        assert!(network.start().is_err());

        network.stop();
        assert!(!network.is_running());
    }

    #[test]
    fn test_poller_creation() {
        let poller = DatapathPoller::new()
            .with_batch_size(32)
            .with_strategy(PollingStrategy::Busy);

        assert_eq!(poller.batch_size, 32);
        assert_eq!(poller.strategy, PollingStrategy::Busy);
    }
}