1use crate::{
2 packet::IpStackPacketProtocol,
3 stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport},
4};
5use ahash::AHashMap;
6use async_channel::{Receiver, Sender};
7use async_executor::Executor;
8use bytes::Bytes;
9use log::{error, trace};
10use moka::sync::Cache;
11use packet::{NetworkPacket, NetworkTuple};
12use parking_lot::Mutex;
13use std::time::Duration;
14
15pub(crate) type PacketSender = Sender<NetworkPacket>;
16pub(crate) type PacketReceiver = Receiver<NetworkPacket>;
17pub(crate) type SessionCollection = Cache<NetworkTuple, PacketSender>;
18
19mod packet;
20pub mod stream;
21
22const DROP_TTL: u8 = 0;
23
24const TTL: u8 = 64;
25
26pub struct IpStackConfig {
27 pub mtu: u16,
28
29 pub tcp_timeout: Duration,
30 pub udp_timeout: Duration,
31}
32
33impl Default for IpStackConfig {
34 fn default() -> Self {
35 IpStackConfig {
36 mtu: 16384,
37
38 tcp_timeout: Duration::from_secs(3600),
39 udp_timeout: Duration::from_secs(600),
40 }
41 }
42}
43
44pub struct IpStack {
45 accept_receiver: Receiver<IpStackStream>,
46 exec: Executor<'static>,
47}
48
49impl IpStack {
50 pub fn new(
51 config: IpStackConfig,
52 recv_packet: Receiver<Bytes>,
53 send_packet: Sender<Bytes>,
54 ) -> IpStack {
55 let (accept_sender, accept_receiver) = async_channel::unbounded();
56 let exec = Executor::new();
57 exec.spawn(run(config, recv_packet, send_packet, accept_sender))
58 .detach();
59
60 IpStack {
61 accept_receiver,
62 exec,
63 }
64 }
65
66 pub async fn accept(&self) -> anyhow::Result<IpStackStream> {
67 self.exec
68 .run(async { Ok(self.accept_receiver.recv().await?) })
69 .await
70 }
71}
72
73async fn run(
74 config: IpStackConfig,
75 recv_packet: Receiver<Bytes>,
76 send_packet: Sender<Bytes>,
77 accept_sender: Sender<IpStackStream>,
78) -> anyhow::Result<()> {
79 let sessions: SessionCollection = Cache::builder()
80 .time_to_idle(Duration::from_secs(600))
81 .build();
82 let sessions = Mutex::new(sessions);
83
84 let (pkt_sender, pkt_receiver) = async_channel::unbounded::<NetworkPacket>();
85
86 let accept_loop = async {
87 loop {
88 let packet = recv_packet.recv().await?;
89 let mut sessions = sessions.lock();
90 if let Some(stream) =
91 process_device_read(&packet, &mut sessions, pkt_sender.clone(), &config)
92 {
93 let _ = accept_sender.try_send(stream);
94 }
95 }
96 };
97
98 let inject_loop = async {
99 loop {
100 let packet = pkt_receiver.recv().await?;
101 let mut sessions = sessions.lock();
102 process_upstream_recv(packet, &mut sessions, send_packet.clone())?;
103 }
104 };
105
106 futures_lite::future::race(accept_loop, inject_loop).await
107}
108
109fn process_device_read(
110 data: &[u8],
111 sessions: &mut SessionCollection,
112 pkt_sender: PacketSender,
113 config: &IpStackConfig,
114) -> Option<IpStackStream> {
115 let Ok(packet) = NetworkPacket::parse(data) else {
116 return Some(IpStackStream::UnknownNetwork(data.to_owned()));
117 };
118
119 if let IpStackPacketProtocol::Unknown = packet.transport_protocol() {
120 return Some(IpStackStream::UnknownTransport(
121 IpStackUnknownTransport::new(
122 packet.src_addr().ip(),
123 packet.dst_addr().ip(),
124 packet.payload,
125 &packet.ip,
126 config.mtu,
127 pkt_sender,
128 ),
129 ));
130 }
131
132 if let Some(sender) = sessions.get(&packet.network_tuple()) {
133 let _ = sender.try_send(packet);
134 None
135 } else {
136 let (a, b) = create_stream(packet.clone(), config, pkt_sender)?;
137 sessions.insert(packet.network_tuple(), a);
138 Some(b)
139 }
140}
141
142fn create_stream(
143 packet: NetworkPacket,
144 config: &IpStackConfig,
145 pkt_sender: PacketSender,
146) -> Option<(PacketSender, IpStackStream)> {
147 match packet.transport_protocol() {
148 IpStackPacketProtocol::Tcp(h) => {
149 match IpStackTcpStream::new(
150 packet.src_addr(),
151 packet.dst_addr(),
152 h,
153 pkt_sender,
154 config.mtu,
155 config.tcp_timeout,
156 ) {
157 Ok(stream) => Some((stream.stream_sender(), IpStackStream::Tcp(stream))),
158 Err(e) => {
159 log::debug!("IpStackTcpStream::new failed \"{}\"", e);
160
161 None
162 }
163 }
164 }
165 IpStackPacketProtocol::Udp => {
166 let stream = IpStackUdpStream::new(
167 packet.src_addr(),
168 packet.dst_addr(),
169 pkt_sender,
170 config.mtu,
171 config.udp_timeout,
172 );
173 let _ = stream.stream_sender().try_send(packet.clone());
174 Some((stream.stream_sender(), IpStackStream::Udp(stream)))
175 }
176 IpStackPacketProtocol::Unknown => {
177 unreachable!()
178 }
179 }
180}
181
182fn process_upstream_recv(
183 packet: NetworkPacket,
184 sessions: &mut SessionCollection,
185 device: Sender<Bytes>,
186) -> anyhow::Result<()> {
187 if packet.ttl() == 0 {
188 sessions.remove(&packet.reverse_network_tuple());
189 return Ok(());
190 }
191 #[allow(unused_mut)]
192 let Ok(mut packet_bytes) = packet.to_bytes() else {
193 trace!("to_bytes error");
194 return Ok(());
195 };
196
197 let _ = device.try_send(packet_bytes.into());
198 Ok(())
201}
202
203pub trait Device {
204 fn read_packet(&self) -> Bytes;
205}