1use std::{marker::PhantomData, num::NonZeroU32};
2
3use aya::{Ebpf, EbpfLoader, programs::XdpFlags};
4use push_packet_common::{DEFAULT_RING_BUF_SIZE, FrameKind};
5use xdpilone::{SocketConfig, UmemConfig};
6
7use crate::{
8 RuleError,
9 channels::{self},
10 ebpf::{EbpfVar, xdp_program},
11 engine::{Engine, linear::LinearEngine},
12 error::Error,
13 filter::Filter,
14 interface::Interface,
15 loader::Loader,
16 relay::{Relay, RelayLoader},
17 rules::{Action, Rule, RuleId},
18};
19
20const FRAME_KIND_MAP: &str = "FRAME_KIND_MAP";
21
22pub struct CopyConfig {
28 pub(crate) ring_buf_size: u32,
29 pub(crate) force_enabled: bool,
30}
31
32impl Default for CopyConfig {
33 fn default() -> Self {
34 Self {
35 ring_buf_size: DEFAULT_RING_BUF_SIZE,
36 force_enabled: false,
37 }
38 }
39}
40
41impl CopyConfig {
42 #[must_use]
45 pub fn force_enabled(mut self) -> Self {
46 self.force_enabled = true;
47 self
48 }
49
50 #[must_use]
52 pub fn ring_buf_size(mut self, ring_buf_size: u32) -> Self {
53 self.ring_buf_size = ring_buf_size;
54 self
55 }
56}
57
58pub struct RouteConfig {
76 pub(crate) force_enabled: bool,
77 pub(crate) umem_config: UmemConfig,
78 pub(crate) socket_config: SocketConfig,
79 pub(crate) frame_count: u32,
80 pub(crate) queue_id: u32,
81}
82
83impl Default for RouteConfig {
84 fn default() -> Self {
85 Self {
86 force_enabled: false,
87 umem_config: UmemConfig {
88 fill_size: 2048,
89 complete_size: 2048,
90 frame_size: 4096,
91 headroom: 32,
92 flags: 0,
93 },
94 socket_config: SocketConfig {
95 rx_size: NonZeroU32::new(2048),
96 tx_size: NonZeroU32::new(2048),
97 bind_flags: SocketConfig::XDP_BIND_NEED_WAKEUP,
98 },
99 frame_count: 8192,
100 queue_id: 0,
101 }
102 }
103}
104
105impl RouteConfig {
106 #[must_use]
109 pub fn force_enabled(mut self) -> Self {
110 self.force_enabled = true;
111 self
112 }
113
114 #[must_use]
116 pub fn umem_config(mut self, umem_config: UmemConfig) -> Self {
117 self.umem_config = umem_config;
118 self
119 }
120
121 #[must_use]
123 pub fn socket_config(mut self, socket_config: SocketConfig) -> Self {
124 self.socket_config = socket_config;
125 self
126 }
127 #[must_use]
130 pub fn frame_count(mut self, frame_count: u32) -> Self {
131 self.frame_count = frame_count;
132 self
133 }
134
135 #[must_use]
142 pub fn queue_id(mut self, queue_id: u32) -> Self {
143 self.queue_id = queue_id;
144 self
145 }
146}
147
148pub struct TapBuilder<E: Engine = LinearEngine> {
150 interface: Result<Interface, Error>,
151 xdp_flags: XdpFlags,
152 copy_config: CopyConfig,
153 route_config: RouteConfig,
154 rules: Vec<Result<Rule, RuleError>>,
155 _marker: PhantomData<E>,
156}
157
158impl<E: Engine> TapBuilder<E> {
159 pub fn new<I>(interface: I) -> Self
161 where
162 I: TryInto<Interface>,
163 I::Error: Into<Error>,
164 {
165 let interface = interface.try_into().map_err(Into::into);
166 let copy_config = CopyConfig::default();
167 let route_config = RouteConfig::default();
168 let xdp_flags = XdpFlags::default();
169 let rules = vec![];
170 let _marker = PhantomData;
171
172 Self {
173 interface,
174 xdp_flags,
175 copy_config,
176 route_config,
177 rules,
178 #[allow(clippy::used_underscore_binding)]
179 _marker,
180 }
181 }
182
183 #[must_use]
185 pub fn xdp_flags(mut self, xdp_flags: XdpFlags) -> Self {
186 self.xdp_flags = xdp_flags;
187 self
188 }
189
190 #[must_use]
192 pub fn copy_config(mut self, copy_config: CopyConfig) -> Self {
193 self.copy_config = copy_config;
194 self
195 }
196
197 #[must_use]
199 pub fn route_config(mut self, route_config: RouteConfig) -> Self {
200 self.route_config = route_config;
201 self
202 }
203
204 #[must_use]
206 pub fn rule<R>(mut self, rule: R) -> Self
207 where
208 R: TryInto<Rule, Error = RuleError>,
209 {
210 let rule = rule.try_into();
211 self.rules.push(rule);
212 self
213 }
214
215 pub fn build(self) -> Result<Tap<E>, Error> {
224 let Self {
225 interface,
226 xdp_flags,
227 copy_config,
228 route_config,
229 rules,
230 ..
231 } = self;
232 let interface = interface?;
233 let frame_kind = interface.frame_kind()?;
234
235 let mut filter = Filter::default();
236 for (i, rule) in rules.into_iter().enumerate() {
237 let rule = rule.map_err(|e| Error::BuilderRule {
238 index: i,
239 source: e,
240 })?;
241 filter.add(rule);
242 }
243
244 let engine_loader = E::Loader::default();
245 let mut ebpf_loader = EbpfLoader::new();
246
247 let relay_loader = RelayLoader::new(copy_config, route_config, &filter, &interface);
248 relay_loader.configure(&mut ebpf_loader)?;
249 engine_loader.configure(&mut ebpf_loader)?;
250
251 let mut ebpf = ebpf_loader.load(E::EBPF_BYTES).map_err(Error::LoadEbpf)?;
252 let mut engine = engine_loader.load(&mut ebpf)?;
253 let frame_kind = (frame_kind, FRAME_KIND_MAP).load(&mut ebpf)?;
254 let relay = relay_loader.load(&mut ebpf)?;
255
256 for (rule_id, rule) in filter.iter_rules() {
257 engine.add_rule(rule_id, rule)?;
258 }
259
260 let program = xdp_program(&mut ebpf, E::EBPF_PROGRAM_NAME)?;
261 program
262 .attach(interface.name(), xdp_flags)
263 .map_err(|e| Error::attach_program(E::EBPF_PROGRAM_NAME, e))?;
264
265 Ok(Tap {
266 interface,
267 engine,
268 filter,
269 ebpf,
270 frame_kind,
271 relay,
272 })
273 }
274}
275
276pub struct Tap<E: Engine = LinearEngine> {
279 interface: Interface,
280 engine: E,
281 filter: Filter,
282 #[allow(unused)]
283 ebpf: Ebpf,
284 frame_kind: EbpfVar<FrameKind>,
285 relay: Relay,
286}
287
288impl Tap {
289 pub fn builder<I>(interface: I) -> TapBuilder
291 where
292 I: TryInto<Interface>,
293 I::Error: Into<Error>,
294 {
295 TapBuilder::new(interface)
296 }
297}
298
299impl<E: Engine> Tap<E> {
300 pub fn frame_kind(&self) -> FrameKind {
302 self.frame_kind.get().to_owned()
303 }
304
305 pub fn route_channel(
311 &mut self,
312 ) -> Result<(channels::route::Sender, channels::route::Receiver), Error> {
313 if !self.relay.route_enabled {
314 return Err(Error::RouteNotEnabled);
315 }
316
317 self.relay
318 .af_xdp_socket
319 .as_mut()
320 .and_then(|c| c.channel.take())
321 .ok_or(Error::ChannelNotAvailable)
322 }
323
324 pub fn copy_receiver(&mut self) -> Result<channels::copy::Receiver, Error> {
330 if !self.relay.copy_enabled {
331 return Err(Error::CopyNotEnabled);
332 }
333 self.relay
334 .copy_receiver
335 .take()
336 .ok_or(Error::ChannelNotAvailable)
337 }
338
339 pub fn add_rule<R>(&mut self, rule: R) -> Result<RuleId, Error>
346 where
347 R: TryInto<Rule>,
348 R::Error: Into<Error>,
349 {
350 let rule = rule.try_into().map_err(Into::into)?;
351 let rule_id = self.filter.next_rule_id();
352 match rule.action {
353 Action::Copy { .. } if !self.relay.copy_enabled => {
354 return Err(Error::CopyNotEnabled);
355 }
356 Action::Route if !self.relay.route_enabled => {
357 return Err(Error::RouteNotEnabled);
358 }
359 _ => {}
360 }
361 self.engine.add_rule(rule_id, &rule)?;
362 self.filter.add(rule);
363 Ok(rule_id)
364 }
365
366 pub fn remove_rule(&mut self, rule_id: RuleId) -> Result<Rule, Error> {
372 let rule = self
373 .filter
374 .get(rule_id)
375 .ok_or(Error::MissingRule(rule_id))?;
376 self.engine.remove_rule(rule_id, rule)?;
377 self.filter
378 .remove(rule_id)
379 .ok_or(Error::MissingRule(rule_id))
380 }
381
382 pub fn interface(&self) -> &Interface {
384 &self.interface
385 }
386}