Skip to main content

laminar_core/xdp/
loader_linux.rs

1//! XDP loader for Linux platforms.
2//!
3//! This module provides the actual XDP loading and attachment functionality
4//! using libbpf-rs when the `xdp` feature is enabled.
5
6use std::sync::atomic::{AtomicBool, Ordering};
7
8use super::{XdpAttachMode, XdpConfig, XdpError, XdpStats};
9
10/// XDP program loader for Linux.
11///
12/// Loads and attaches XDP programs to network interfaces for packet filtering
13/// and CPU steering.
14///
15/// # Example
16///
17/// ```rust,ignore
18/// use laminar_core::xdp::{XdpConfig, XdpLoader};
19///
20/// let config = XdpConfig::builder()
21///     .enabled(true)
22///     .interface("eth0")
23///     .port(9999)
24///     .build()?;
25///
26/// let loader = XdpLoader::load_and_attach(&config, 4)?;
27/// println!("XDP active: {}", loader.is_active());
28///
29/// // Get statistics
30/// let stats = loader.stats();
31/// println!("Redirected: {}", stats.redirected);
32/// ```
33#[derive(Debug)]
34pub struct XdpLoader {
35    /// Interface name
36    interface: String,
37    /// Interface index
38    ifindex: u32,
39    /// Whether XDP is active
40    active: AtomicBool,
41    /// Number of cores
42    num_cores: usize,
43    /// Local stats tracking
44    stats: super::stats::AtomicXdpStats,
45    /// Attach mode used
46    attach_mode: XdpAttachMode,
47    /// Configuration used
48    #[allow(dead_code)]
49    config: XdpConfig,
50}
51
52impl XdpLoader {
53    /// Loads and attaches XDP program to network interface.
54    ///
55    /// # Arguments
56    ///
57    /// * `config` - XDP configuration
58    /// * `num_cores` - Number of cores for CPU map
59    ///
60    /// # Errors
61    ///
62    /// Returns an error if:
63    /// - XDP program file not found
64    /// - Permission denied (requires `CAP_NET_ADMIN` or root)
65    /// - Network interface not found
66    /// - BPF program loading fails
67    pub fn load_and_attach(config: &XdpConfig, num_cores: usize) -> Result<Self, XdpError> {
68        config.validate()?;
69
70        if !config.enabled {
71            return Self::create_inactive(config, num_cores);
72        }
73
74        // Check if BPF object file exists
75        if !config.bpf_object_path.exists() {
76            if config.fallback_on_error {
77                tracing::warn!(
78                    "XDP program not found at {:?}, falling back to standard sockets",
79                    config.bpf_object_path
80                );
81                return Self::create_inactive(config, num_cores);
82            }
83            return Err(XdpError::ProgramNotFound(config.bpf_object_path.clone()));
84        }
85
86        // Get interface index
87        let ifindex = Self::get_interface_index(&config.interface)?;
88
89        // Try to load and attach XDP program
90        match Self::do_load_and_attach(config, ifindex, num_cores) {
91            Ok(loader) => Ok(loader),
92            Err(e) => {
93                if config.fallback_on_error {
94                    tracing::warn!("XDP load failed, falling back: {}", e);
95                    Self::create_inactive(config, num_cores)
96                } else {
97                    Err(e)
98                }
99            }
100        }
101    }
102
103    /// Creates an inactive loader (fallback mode).
104    #[allow(clippy::unnecessary_wraps)]
105    fn create_inactive(config: &XdpConfig, num_cores: usize) -> Result<Self, XdpError> {
106        Ok(Self {
107            interface: config.interface.clone(),
108            ifindex: 0,
109            active: AtomicBool::new(false),
110            num_cores,
111            stats: super::stats::AtomicXdpStats::new(),
112            attach_mode: config.attach_mode,
113            config: config.clone(),
114        })
115    }
116
117    /// Actually loads and attaches the XDP program.
118    #[cfg(feature = "xdp")]
119    fn do_load_and_attach(
120        config: &XdpConfig,
121        ifindex: u32,
122        num_cores: usize,
123    ) -> Result<Self, XdpError> {
124        use libbpf_rs::{MapCore, MapFlags, ObjectBuilder};
125
126        // Load BPF object
127        let mut obj = ObjectBuilder::default()
128            .open_file(&config.bpf_object_path)
129            .map_err(|e| XdpError::LoadFailed(e.to_string()))?
130            .load()
131            .map_err(|e| XdpError::LoadFailed(e.to_string()))?;
132
133        // Get XDP program by iterating progs()
134        let prog = obj
135            .progs_mut()
136            .find(|p| p.name() == "laminar_ingress")
137            .ok_or_else(|| XdpError::MapNotFound("laminar_ingress program".to_string()))?;
138
139        // Attach to interface
140        #[allow(clippy::cast_possible_wrap)]
141        let _link = prog
142            .attach_xdp(ifindex as i32)
143            .map_err(|e: libbpf_rs::Error| XdpError::AttachFailed(e.to_string()))?;
144
145        // Configure CPU map if present
146        if let Some(cpu_map) = obj.maps_mut().find(|m| m.name() == "cpu_map") {
147            #[allow(clippy::cast_possible_truncation)]
148            for cpu in 0..num_cores {
149                let key = (cpu as u32).to_ne_bytes();
150                // CpumapValue format: qsize as u32
151                let qsize = config.cpu_queue_size;
152                let value = qsize.to_ne_bytes();
153                cpu_map
154                    .update(&key, &value, MapFlags::ANY)
155                    .map_err(|e: libbpf_rs::Error| XdpError::MapUpdateFailed(e.to_string()))?;
156            }
157        }
158
159        tracing::info!(
160            "XDP program attached to interface {} (index {})",
161            config.interface,
162            ifindex
163        );
164
165        Ok(Self {
166            interface: config.interface.clone(),
167            ifindex,
168            active: AtomicBool::new(true),
169            num_cores,
170            stats: super::stats::AtomicXdpStats::new(),
171            attach_mode: config.attach_mode,
172            config: config.clone(),
173        })
174    }
175
176    /// Stub for when xdp feature is not enabled.
177    #[cfg(not(feature = "xdp"))]
178    fn do_load_and_attach(
179        config: &XdpConfig,
180        _ifindex: u32,
181        num_cores: usize,
182    ) -> Result<Self, XdpError> {
183        tracing::warn!("XDP feature not enabled, using stub implementation");
184        Self::create_inactive(config, num_cores)
185    }
186
187    /// Gets the interface index from the interface name.
188    fn get_interface_index(interface: &str) -> Result<u32, XdpError> {
189        use std::ffi::CString;
190
191        let c_interface = CString::new(interface)
192            .map_err(|_| XdpError::InterfaceNotFound(interface.to_string()))?;
193
194        // SAFETY: We're calling a standard libc function with a valid CString
195        let ifindex = unsafe { libc::if_nametoindex(c_interface.as_ptr()) };
196
197        if ifindex == 0 {
198            Err(XdpError::InterfaceNotFound(interface.to_string()))
199        } else {
200            Ok(ifindex)
201        }
202    }
203
204    /// Returns true if XDP is actually active.
205    #[must_use]
206    pub fn is_active(&self) -> bool {
207        self.active.load(Ordering::Acquire)
208    }
209
210    /// Returns XDP statistics.
211    #[must_use]
212    pub fn stats(&self) -> XdpStats {
213        if self.is_active() {
214            self.read_bpf_stats()
215        } else {
216            self.stats.snapshot()
217        }
218    }
219
220    /// Reads statistics from BPF maps.
221    #[cfg(feature = "xdp")]
222    fn read_bpf_stats(&self) -> XdpStats {
223        // In a real implementation, this would read from the BPF stats map
224        // For now, return the local stats
225        self.stats.snapshot()
226    }
227
228    #[cfg(not(feature = "xdp"))]
229    fn read_bpf_stats(&self) -> XdpStats {
230        self.stats.snapshot()
231    }
232
233    /// Updates CPU steering for a partition.
234    ///
235    /// This allows runtime reconfiguration of which CPU handles
236    /// packets for a given partition.
237    ///
238    /// # Errors
239    ///
240    /// Returns `XdpError::InvalidConfig` if the CPU index is out of range.
241    pub fn update_cpu_steering(&self, partition: u32, cpu: u32) -> Result<(), XdpError> {
242        if !self.is_active() {
243            return Ok(());
244        }
245
246        if cpu as usize >= self.num_cores {
247            return Err(XdpError::InvalidConfig(format!(
248                "CPU {} out of range (0..{})",
249                cpu, self.num_cores
250            )));
251        }
252
253        // In a real implementation, this would update the BPF partition map
254        tracing::debug!("XDP steering: partition {} -> CPU {}", partition, cpu);
255        Ok(())
256    }
257
258    /// Detaches the XDP program from the interface.
259    ///
260    /// # Errors
261    ///
262    /// Currently infallible, but may return errors in future implementations
263    /// when actual XDP detachment is performed.
264    pub fn detach(&self) -> Result<(), XdpError> {
265        if !self.is_active() {
266            return Ok(());
267        }
268
269        self.do_detach();
270        self.active.store(false, Ordering::Release);
271
272        tracing::info!("XDP program detached from interface {}", self.interface);
273        Ok(())
274    }
275
276    #[cfg(feature = "xdp")]
277    #[allow(clippy::unused_self)]
278    fn do_detach(&self) {
279        // Detach XDP program by setting empty flags (removes the program)
280        // In newer libbpf-rs, detach is handled by dropping the link
281        // For now, we just log that detach was requested
282        // The actual detach happens when the XdpLink is dropped
283    }
284
285    #[cfg(not(feature = "xdp"))]
286    #[allow(clippy::unused_self)]
287    fn do_detach(&self) {
288        // No-op when xdp feature is disabled
289    }
290
291    /// Returns the interface name.
292    #[must_use]
293    pub fn interface(&self) -> &str {
294        &self.interface
295    }
296
297    /// Returns the interface index.
298    #[must_use]
299    pub fn ifindex(&self) -> u32 {
300        self.ifindex
301    }
302
303    /// Returns the number of cores configured.
304    #[must_use]
305    pub fn num_cores(&self) -> usize {
306        self.num_cores
307    }
308
309    /// Returns the attach mode.
310    #[must_use]
311    pub fn attach_mode(&self) -> XdpAttachMode {
312        self.attach_mode
313    }
314
315    /// Records a packet being processed (for tracking without XDP).
316    pub fn record_packet(&self, was_valid: bool, bytes: u64) {
317        if was_valid {
318            self.stats.inc_passed();
319        } else {
320            self.stats.inc_invalid();
321        }
322        self.stats.add_bytes(bytes);
323    }
324}
325
326impl Drop for XdpLoader {
327    fn drop(&mut self) {
328        if self.is_active() {
329            if let Err(e) = self.detach() {
330                tracing::error!("Failed to detach XDP on drop: {}", e);
331            }
332        }
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn test_loader_disabled() {
342        let config = XdpConfig {
343            enabled: false,
344            ..Default::default()
345        };
346
347        let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
348        assert!(!loader.is_active());
349    }
350
351    #[test]
352    fn test_loader_fallback() {
353        let config = XdpConfig {
354            enabled: true,
355            fallback_on_error: true,
356            bpf_object_path: "/nonexistent/path.o".into(),
357            ..Default::default()
358        };
359
360        let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
361        assert!(!loader.is_active());
362    }
363
364    #[test]
365    fn test_loader_no_fallback() {
366        let config = XdpConfig {
367            enabled: true,
368            fallback_on_error: false,
369            bpf_object_path: "/nonexistent/path.o".into(),
370            ..Default::default()
371        };
372
373        let result = XdpLoader::load_and_attach(&config, 4);
374        assert!(result.is_err());
375    }
376
377    #[test]
378    fn test_interface_index_loopback() {
379        // Loopback interface should always exist
380        let result = XdpLoader::get_interface_index("lo");
381        assert!(result.is_ok());
382        assert!(result.unwrap() > 0);
383    }
384
385    #[test]
386    fn test_interface_index_nonexistent() {
387        let result = XdpLoader::get_interface_index("nonexistent_interface_xyz");
388        assert!(result.is_err());
389    }
390
391    #[test]
392    fn test_loader_stats() {
393        let config = XdpConfig {
394            enabled: false,
395            ..Default::default()
396        };
397
398        let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
399        loader.record_packet(true, 100);
400        loader.record_packet(false, 50);
401
402        let stats = loader.stats();
403        assert_eq!(stats.passed, 1);
404        assert_eq!(stats.invalid, 1);
405        assert_eq!(stats.bytes_processed, 150);
406    }
407
408    #[test]
409    fn test_cpu_steering_inactive() {
410        let config = XdpConfig {
411            enabled: false,
412            ..Default::default()
413        };
414
415        let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
416        // Should succeed as no-op
417        loader.update_cpu_steering(0, 0).unwrap();
418    }
419
420    #[test]
421    fn test_cpu_steering_invalid_cpu() {
422        let config = XdpConfig {
423            enabled: true,
424            fallback_on_error: true,
425            ..Default::default()
426        };
427
428        let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
429        // When active, invalid CPU should fail
430        // But since we're in fallback mode, it's inactive
431        assert!(!loader.is_active());
432    }
433
434    #[test]
435    fn test_detach_inactive() {
436        let config = XdpConfig {
437            enabled: false,
438            ..Default::default()
439        };
440
441        let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
442        loader.detach().unwrap(); // Should succeed as no-op
443    }
444}