snap_dataplane/tunnel_gateway/
dispatcher.rs1use std::net::SocketAddr;
17
18use ana_gotatun::packet::Packet;
19use sciparse::{core::view::View, packet::view::ScionPacketView};
20use tokio::sync::mpsc::{Receiver, Sender, channel, error::TrySendError};
21
22use crate::{
23 dispatcher::Dispatcher,
24 tunnel_gateway::{gateway::PacketPool, metrics::TunnelGatewayDispatcherMetrics},
25};
26
27const OUTBOUND_QUEUE_SIZE: usize = 1024;
28const BUFFER_POOL_INIT_SIZE: usize = 1024;
29
30#[derive(Clone)]
32pub struct TunnelGatewayDispatcher {
33 metrics: TunnelGatewayDispatcherMetrics,
34 pool: PacketPool,
35 outbound_queue: Sender<(SocketAddr, Packet)>,
39}
40
41impl TunnelGatewayDispatcher {
42 pub fn new(metrics: TunnelGatewayDispatcherMetrics) -> (Self, TunnelGatewayDispatcherReceiver) {
52 let pool = ana_gotatun::packet::PacketBufPool::new(BUFFER_POOL_INIT_SIZE);
53 let (tx, rx) = channel(OUTBOUND_QUEUE_SIZE);
54 let myself = Self {
55 metrics,
56 pool: pool.clone(),
57 outbound_queue: tx,
58 };
59
60 let rx = TunnelGatewayDispatcherReceiver {
61 outbound_queue: rx,
62 pool,
63 };
64 (myself, rx)
65 }
66}
67
68impl Dispatcher for TunnelGatewayDispatcher {
69 fn try_dispatch(&self, packet: &ScionPacketView) {
70 let classification = match packet.classify() {
71 Ok(c) => c,
72 Err(e) => {
73 self.metrics.invalid_packets_errors.inc();
74 tracing::debug!(error=%e, "Failed to classify packet");
75 return;
76 }
77 };
78
79 let sock_addr = match classification
80 .dst_socket_addr()
81 .and_then(|a| a.socket_addr())
82 {
83 Some(addr) => addr,
84 None => {
85 self.metrics.invalid_packets_errors.inc();
86 tracing::debug!("Could not deduce destination socket address from packet");
87 return;
88 }
89 };
90
91 let raw_bytes = packet.as_bytes();
92 let mut pooled_packet = self.pool.get();
93 pooled_packet.as_mut()[..raw_bytes.len()].copy_from_slice(raw_bytes);
94 pooled_packet.truncate(raw_bytes.len());
95
96 match self.outbound_queue.try_send((sock_addr, pooled_packet)) {
97 Ok(_) => self.metrics.dispatch_queue_size.inc(),
98 Err(TrySendError::Closed(_)) => self.metrics.closed_dispatch_queue_errors.inc(),
99 Err(TrySendError::Full(_)) => self.metrics.full_dispatch_queue_errors.inc(),
100 }
101 }
102}
103
104pub struct TunnelGatewayDispatcherReceiver {
106 pub(crate) pool: PacketPool,
107 pub(crate) outbound_queue: Receiver<(SocketAddr, Packet)>,
108}