rustp2p_transport/task/
mod.rs

1use bytes::BytesMut;
2use rustp2p::pipe::{Pipe, PipeLine, PipeWriter, RecvError};
3use std::io;
4use std::net::Ipv4Addr;
5use tcp_ip::{IpStackRecv, IpStackSend};
6
7pub async fn start(
8    mtu: u16,
9    pipe: Pipe,
10    ip_stack_send: IpStackSend,
11    ip_stack_recv: IpStackRecv,
12) -> io::Result<()> {
13    let pipe_writer = pipe.writer().clone();
14    tokio::spawn(async move {
15        if let Err(e) = pipe_accept_handle(pipe, ip_stack_send).await {
16            if e.kind() != io::ErrorKind::BrokenPipe {
17                log::warn!("pipe_accept {e:?}");
18            }
19        }
20    });
21    tokio::spawn(async move {
22        ip_stack_recv_handle(mtu as usize, ip_stack_recv, &pipe_writer).await;
23        _ = pipe_writer.shutdown();
24    });
25    Ok(())
26}
27
28async fn ip_stack_recv_handle(
29    mtu: usize,
30    mut ip_stack_recv: IpStackRecv,
31    pipe_writer: &PipeWriter,
32) {
33    let mut bufs = Vec::with_capacity(128);
34    let mut sizes = vec![0; 128];
35    for _ in 0..128 {
36        bufs.push(BytesMut::zeroed(mtu))
37    }
38    while let Ok(num) = ip_stack_recv.recv_ip_packet(&mut bufs, &mut sizes).await {
39        for index in 0..num {
40            let buf = &bufs[index];
41            let len = sizes[index];
42            let dst = if buf[0] >> 4 != 4 {
43                if let Some(ipv6_packet) = pnet_packet::ipv6::Ipv6Packet::new(&buf[..len]) {
44                    let last: [u8; 4] = ipv6_packet.get_destination().octets()[12..]
45                        .try_into()
46                        .unwrap();
47                    Ipv4Addr::from(last)
48                } else {
49                    continue;
50                }
51            } else {
52                Ipv4Addr::new(buf[16], buf[17], buf[18], buf[19])
53            };
54            let mut send_packet = pipe_writer.allocate_send_packet();
55            send_packet.set_payload(buf);
56            if let Err(e) = pipe_writer.send_packet_to(send_packet, &dst.into()).await {
57                log::warn!("send_packet_to {e:?},dst={dst}");
58            }
59        }
60    }
61}
62
63async fn pipe_accept_handle(mut pipe: Pipe, ip_stack_send: IpStackSend) -> io::Result<()> {
64    loop {
65        let line = pipe.accept().await?;
66        let ip_stack_send = ip_stack_send.clone();
67        tokio::spawn(async move {
68            let addr = line.remote_addr();
69            if let Err(e) = pipe_line_recv_handle(line, ip_stack_send).await {
70                log::warn!("pipe_line_recv {e:?} {addr:?}");
71            }
72        });
73    }
74}
75async fn pipe_line_recv_handle(
76    mut pipe_line: PipeLine,
77    ip_stack_send: IpStackSend,
78) -> io::Result<()> {
79    let mut list = Vec::with_capacity(16);
80
81    loop {
82        let result = match pipe_line.recv_multi(&mut list).await {
83            Ok(rs) => rs,
84            Err(e) => {
85                return match e {
86                    RecvError::Done => Ok(()),
87                    RecvError::Io(e) => Err(e),
88                }
89            }
90        };
91        if let Err(e) = result {
92            log::warn!("recv_data_handle {e:?} {:?}", pipe_line.remote_addr());
93            continue;
94        }
95        for data in list.drain(..) {
96            if let Err(e) = ip_stack_send.send_ip_packet(data.payload()).await {
97                log::warn!("{e:?} {:?}", data.route_key());
98            }
99        }
100    }
101}