ipstack_geph/
lib.rs

1use crate::{
2    packet::IpStackPacketProtocol,
3    stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport},
4};
5use ahash::AHashMap;
6use async_channel::{Receiver, Sender};
7use async_executor::Executor;
8use bytes::Bytes;
9use log::{error, trace};
10use moka::sync::Cache;
11use packet::{NetworkPacket, NetworkTuple};
12use parking_lot::Mutex;
13use std::time::Duration;
14
15pub(crate) type PacketSender = Sender<NetworkPacket>;
16pub(crate) type PacketReceiver = Receiver<NetworkPacket>;
17pub(crate) type SessionCollection = Cache<NetworkTuple, PacketSender>;
18
19mod packet;
20pub mod stream;
21
22const DROP_TTL: u8 = 0;
23
24const TTL: u8 = 64;
25
26pub struct IpStackConfig {
27    pub mtu: u16,
28
29    pub tcp_timeout: Duration,
30    pub udp_timeout: Duration,
31}
32
33impl Default for IpStackConfig {
34    fn default() -> Self {
35        IpStackConfig {
36            mtu: 16384,
37
38            tcp_timeout: Duration::from_secs(3600),
39            udp_timeout: Duration::from_secs(600),
40        }
41    }
42}
43
44pub struct IpStack {
45    accept_receiver: Receiver<IpStackStream>,
46    exec: Executor<'static>,
47}
48
49impl IpStack {
50    pub fn new(
51        config: IpStackConfig,
52        recv_packet: Receiver<Bytes>,
53        send_packet: Sender<Bytes>,
54    ) -> IpStack {
55        let (accept_sender, accept_receiver) = async_channel::unbounded();
56        let exec = Executor::new();
57        exec.spawn(run(config, recv_packet, send_packet, accept_sender))
58            .detach();
59
60        IpStack {
61            accept_receiver,
62            exec,
63        }
64    }
65
66    pub async fn accept(&self) -> anyhow::Result<IpStackStream> {
67        self.exec
68            .run(async { Ok(self.accept_receiver.recv().await?) })
69            .await
70    }
71}
72
73async fn run(
74    config: IpStackConfig,
75    recv_packet: Receiver<Bytes>,
76    send_packet: Sender<Bytes>,
77    accept_sender: Sender<IpStackStream>,
78) -> anyhow::Result<()> {
79    let sessions: SessionCollection = Cache::builder()
80        .time_to_idle(Duration::from_secs(600))
81        .build();
82    let sessions = Mutex::new(sessions);
83
84    let (pkt_sender, pkt_receiver) = async_channel::unbounded::<NetworkPacket>();
85
86    let accept_loop = async {
87        loop {
88            let packet = recv_packet.recv().await?;
89            let mut sessions = sessions.lock();
90            if let Some(stream) =
91                process_device_read(&packet, &mut sessions, pkt_sender.clone(), &config)
92            {
93                let _ = accept_sender.try_send(stream);
94            }
95        }
96    };
97
98    let inject_loop = async {
99        loop {
100            let packet = pkt_receiver.recv().await?;
101            let mut sessions = sessions.lock();
102            process_upstream_recv(packet, &mut sessions, send_packet.clone())?;
103        }
104    };
105
106    futures_lite::future::race(accept_loop, inject_loop).await
107}
108
109fn process_device_read(
110    data: &[u8],
111    sessions: &mut SessionCollection,
112    pkt_sender: PacketSender,
113    config: &IpStackConfig,
114) -> Option<IpStackStream> {
115    let Ok(packet) = NetworkPacket::parse(data) else {
116        return Some(IpStackStream::UnknownNetwork(data.to_owned()));
117    };
118
119    if let IpStackPacketProtocol::Unknown = packet.transport_protocol() {
120        return Some(IpStackStream::UnknownTransport(
121            IpStackUnknownTransport::new(
122                packet.src_addr().ip(),
123                packet.dst_addr().ip(),
124                packet.payload,
125                &packet.ip,
126                config.mtu,
127                pkt_sender,
128            ),
129        ));
130    }
131
132    if let Some(sender) = sessions.get(&packet.network_tuple()) {
133        let _ = sender.try_send(packet);
134        None
135    } else {
136        let (a, b) = create_stream(packet.clone(), config, pkt_sender)?;
137        sessions.insert(packet.network_tuple(), a);
138        Some(b)
139    }
140}
141
142fn create_stream(
143    packet: NetworkPacket,
144    config: &IpStackConfig,
145    pkt_sender: PacketSender,
146) -> Option<(PacketSender, IpStackStream)> {
147    match packet.transport_protocol() {
148        IpStackPacketProtocol::Tcp(h) => {
149            match IpStackTcpStream::new(
150                packet.src_addr(),
151                packet.dst_addr(),
152                h,
153                pkt_sender,
154                config.mtu,
155                config.tcp_timeout,
156            ) {
157                Ok(stream) => Some((stream.stream_sender(), IpStackStream::Tcp(stream))),
158                Err(e) => {
159                    log::debug!("IpStackTcpStream::new failed \"{}\"", e);
160
161                    None
162                }
163            }
164        }
165        IpStackPacketProtocol::Udp => {
166            let stream = IpStackUdpStream::new(
167                packet.src_addr(),
168                packet.dst_addr(),
169                pkt_sender,
170                config.mtu,
171                config.udp_timeout,
172            );
173            let _ = stream.stream_sender().try_send(packet.clone());
174            Some((stream.stream_sender(), IpStackStream::Udp(stream)))
175        }
176        IpStackPacketProtocol::Unknown => {
177            unreachable!()
178        }
179    }
180}
181
182fn process_upstream_recv(
183    packet: NetworkPacket,
184    sessions: &mut SessionCollection,
185    device: Sender<Bytes>,
186) -> anyhow::Result<()> {
187    if packet.ttl() == 0 {
188        sessions.remove(&packet.reverse_network_tuple());
189        return Ok(());
190    }
191    #[allow(unused_mut)]
192    let Ok(mut packet_bytes) = packet.to_bytes() else {
193        trace!("to_bytes error");
194        return Ok(());
195    };
196
197    let _ = device.try_send(packet_bytes.into());
198    // device.flush().await.unwrap();
199
200    Ok(())
201}
202
203pub trait Device {
204    fn read_packet(&self) -> Bytes;
205}