1use std::sync::atomic::{AtomicBool, Ordering};
7
8use super::{XdpAttachMode, XdpConfig, XdpError, XdpStats};
9
10#[derive(Debug)]
34pub struct XdpLoader {
35 interface: String,
37 ifindex: u32,
39 active: AtomicBool,
41 num_cores: usize,
43 stats: super::stats::AtomicXdpStats,
45 attach_mode: XdpAttachMode,
47 #[allow(dead_code)]
49 config: XdpConfig,
50}
51
52impl XdpLoader {
53 pub fn load_and_attach(config: &XdpConfig, num_cores: usize) -> Result<Self, XdpError> {
68 config.validate()?;
69
70 if !config.enabled {
71 return Self::create_inactive(config, num_cores);
72 }
73
74 if !config.bpf_object_path.exists() {
76 if config.fallback_on_error {
77 tracing::warn!(
78 "XDP program not found at {:?}, falling back to standard sockets",
79 config.bpf_object_path
80 );
81 return Self::create_inactive(config, num_cores);
82 }
83 return Err(XdpError::ProgramNotFound(config.bpf_object_path.clone()));
84 }
85
86 let ifindex = Self::get_interface_index(&config.interface)?;
88
89 match Self::do_load_and_attach(config, ifindex, num_cores) {
91 Ok(loader) => Ok(loader),
92 Err(e) => {
93 if config.fallback_on_error {
94 tracing::warn!("XDP load failed, falling back: {}", e);
95 Self::create_inactive(config, num_cores)
96 } else {
97 Err(e)
98 }
99 }
100 }
101 }
102
103 #[allow(clippy::unnecessary_wraps)]
105 fn create_inactive(config: &XdpConfig, num_cores: usize) -> Result<Self, XdpError> {
106 Ok(Self {
107 interface: config.interface.clone(),
108 ifindex: 0,
109 active: AtomicBool::new(false),
110 num_cores,
111 stats: super::stats::AtomicXdpStats::new(),
112 attach_mode: config.attach_mode,
113 config: config.clone(),
114 })
115 }
116
117 #[cfg(feature = "xdp")]
119 fn do_load_and_attach(
120 config: &XdpConfig,
121 ifindex: u32,
122 num_cores: usize,
123 ) -> Result<Self, XdpError> {
124 use libbpf_rs::{MapCore, MapFlags, ObjectBuilder};
125
126 let mut obj = ObjectBuilder::default()
128 .open_file(&config.bpf_object_path)
129 .map_err(|e| XdpError::LoadFailed(e.to_string()))?
130 .load()
131 .map_err(|e| XdpError::LoadFailed(e.to_string()))?;
132
133 let prog = obj
135 .progs_mut()
136 .find(|p| p.name() == "laminar_ingress")
137 .ok_or_else(|| XdpError::MapNotFound("laminar_ingress program".to_string()))?;
138
139 #[allow(clippy::cast_possible_wrap)]
141 let _link = prog
142 .attach_xdp(ifindex as i32)
143 .map_err(|e: libbpf_rs::Error| XdpError::AttachFailed(e.to_string()))?;
144
145 if let Some(cpu_map) = obj.maps_mut().find(|m| m.name() == "cpu_map") {
147 #[allow(clippy::cast_possible_truncation)]
148 for cpu in 0..num_cores {
149 let key = (cpu as u32).to_ne_bytes();
150 let qsize = config.cpu_queue_size;
152 let value = qsize.to_ne_bytes();
153 cpu_map
154 .update(&key, &value, MapFlags::ANY)
155 .map_err(|e: libbpf_rs::Error| XdpError::MapUpdateFailed(e.to_string()))?;
156 }
157 }
158
159 tracing::info!(
160 "XDP program attached to interface {} (index {})",
161 config.interface,
162 ifindex
163 );
164
165 Ok(Self {
166 interface: config.interface.clone(),
167 ifindex,
168 active: AtomicBool::new(true),
169 num_cores,
170 stats: super::stats::AtomicXdpStats::new(),
171 attach_mode: config.attach_mode,
172 config: config.clone(),
173 })
174 }
175
176 #[cfg(not(feature = "xdp"))]
178 fn do_load_and_attach(
179 config: &XdpConfig,
180 _ifindex: u32,
181 num_cores: usize,
182 ) -> Result<Self, XdpError> {
183 tracing::warn!("XDP feature not enabled, using stub implementation");
184 Self::create_inactive(config, num_cores)
185 }
186
187 fn get_interface_index(interface: &str) -> Result<u32, XdpError> {
189 use std::ffi::CString;
190
191 let c_interface = CString::new(interface)
192 .map_err(|_| XdpError::InterfaceNotFound(interface.to_string()))?;
193
194 let ifindex = unsafe { libc::if_nametoindex(c_interface.as_ptr()) };
196
197 if ifindex == 0 {
198 Err(XdpError::InterfaceNotFound(interface.to_string()))
199 } else {
200 Ok(ifindex)
201 }
202 }
203
204 #[must_use]
206 pub fn is_active(&self) -> bool {
207 self.active.load(Ordering::Acquire)
208 }
209
210 #[must_use]
212 pub fn stats(&self) -> XdpStats {
213 if self.is_active() {
214 self.read_bpf_stats()
215 } else {
216 self.stats.snapshot()
217 }
218 }
219
220 #[cfg(feature = "xdp")]
222 fn read_bpf_stats(&self) -> XdpStats {
223 self.stats.snapshot()
226 }
227
228 #[cfg(not(feature = "xdp"))]
229 fn read_bpf_stats(&self) -> XdpStats {
230 self.stats.snapshot()
231 }
232
233 pub fn update_cpu_steering(&self, partition: u32, cpu: u32) -> Result<(), XdpError> {
242 if !self.is_active() {
243 return Ok(());
244 }
245
246 if cpu as usize >= self.num_cores {
247 return Err(XdpError::InvalidConfig(format!(
248 "CPU {} out of range (0..{})",
249 cpu, self.num_cores
250 )));
251 }
252
253 tracing::debug!("XDP steering: partition {} -> CPU {}", partition, cpu);
255 Ok(())
256 }
257
258 pub fn detach(&self) -> Result<(), XdpError> {
265 if !self.is_active() {
266 return Ok(());
267 }
268
269 self.do_detach();
270 self.active.store(false, Ordering::Release);
271
272 tracing::info!("XDP program detached from interface {}", self.interface);
273 Ok(())
274 }
275
276 #[cfg(feature = "xdp")]
277 #[allow(clippy::unused_self)]
278 fn do_detach(&self) {
279 }
284
285 #[cfg(not(feature = "xdp"))]
286 #[allow(clippy::unused_self)]
287 fn do_detach(&self) {
288 }
290
291 #[must_use]
293 pub fn interface(&self) -> &str {
294 &self.interface
295 }
296
297 #[must_use]
299 pub fn ifindex(&self) -> u32 {
300 self.ifindex
301 }
302
303 #[must_use]
305 pub fn num_cores(&self) -> usize {
306 self.num_cores
307 }
308
309 #[must_use]
311 pub fn attach_mode(&self) -> XdpAttachMode {
312 self.attach_mode
313 }
314
315 pub fn record_packet(&self, was_valid: bool, bytes: u64) {
317 if was_valid {
318 self.stats.inc_passed();
319 } else {
320 self.stats.inc_invalid();
321 }
322 self.stats.add_bytes(bytes);
323 }
324}
325
326impl Drop for XdpLoader {
327 fn drop(&mut self) {
328 if self.is_active() {
329 if let Err(e) = self.detach() {
330 tracing::error!("Failed to detach XDP on drop: {}", e);
331 }
332 }
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn test_loader_disabled() {
342 let config = XdpConfig {
343 enabled: false,
344 ..Default::default()
345 };
346
347 let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
348 assert!(!loader.is_active());
349 }
350
351 #[test]
352 fn test_loader_fallback() {
353 let config = XdpConfig {
354 enabled: true,
355 fallback_on_error: true,
356 bpf_object_path: "/nonexistent/path.o".into(),
357 ..Default::default()
358 };
359
360 let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
361 assert!(!loader.is_active());
362 }
363
364 #[test]
365 fn test_loader_no_fallback() {
366 let config = XdpConfig {
367 enabled: true,
368 fallback_on_error: false,
369 bpf_object_path: "/nonexistent/path.o".into(),
370 ..Default::default()
371 };
372
373 let result = XdpLoader::load_and_attach(&config, 4);
374 assert!(result.is_err());
375 }
376
377 #[test]
378 fn test_interface_index_loopback() {
379 let result = XdpLoader::get_interface_index("lo");
381 assert!(result.is_ok());
382 assert!(result.unwrap() > 0);
383 }
384
385 #[test]
386 fn test_interface_index_nonexistent() {
387 let result = XdpLoader::get_interface_index("nonexistent_interface_xyz");
388 assert!(result.is_err());
389 }
390
391 #[test]
392 fn test_loader_stats() {
393 let config = XdpConfig {
394 enabled: false,
395 ..Default::default()
396 };
397
398 let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
399 loader.record_packet(true, 100);
400 loader.record_packet(false, 50);
401
402 let stats = loader.stats();
403 assert_eq!(stats.passed, 1);
404 assert_eq!(stats.invalid, 1);
405 assert_eq!(stats.bytes_processed, 150);
406 }
407
408 #[test]
409 fn test_cpu_steering_inactive() {
410 let config = XdpConfig {
411 enabled: false,
412 ..Default::default()
413 };
414
415 let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
416 loader.update_cpu_steering(0, 0).unwrap();
418 }
419
420 #[test]
421 fn test_cpu_steering_invalid_cpu() {
422 let config = XdpConfig {
423 enabled: true,
424 fallback_on_error: true,
425 ..Default::default()
426 };
427
428 let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
429 assert!(!loader.is_active());
432 }
433
434 #[test]
435 fn test_detach_inactive() {
436 let config = XdpConfig {
437 enabled: false,
438 ..Default::default()
439 };
440
441 let loader = XdpLoader::load_and_attach(&config, 4).unwrap();
442 loader.detach().unwrap(); }
444}