rustp2p_transport/task/
mod.rs1use bytes::BytesMut;
2use rustp2p::pipe::{Pipe, PipeLine, PipeWriter, RecvError};
3use std::io;
4use std::net::Ipv4Addr;
5use tcp_ip::{IpStackRecv, IpStackSend};
6
7pub async fn start(
8 mtu: u16,
9 pipe: Pipe,
10 ip_stack_send: IpStackSend,
11 ip_stack_recv: IpStackRecv,
12) -> io::Result<()> {
13 let pipe_writer = pipe.writer().clone();
14 tokio::spawn(async move {
15 if let Err(e) = pipe_accept_handle(pipe, ip_stack_send).await {
16 if e.kind() != io::ErrorKind::BrokenPipe {
17 log::warn!("pipe_accept {e:?}");
18 }
19 }
20 });
21 tokio::spawn(async move {
22 ip_stack_recv_handle(mtu as usize, ip_stack_recv, &pipe_writer).await;
23 _ = pipe_writer.shutdown();
24 });
25 Ok(())
26}
27
28async fn ip_stack_recv_handle(
29 mtu: usize,
30 mut ip_stack_recv: IpStackRecv,
31 pipe_writer: &PipeWriter,
32) {
33 let mut bufs = Vec::with_capacity(128);
34 let mut sizes = vec![0; 128];
35 for _ in 0..128 {
36 bufs.push(BytesMut::zeroed(mtu))
37 }
38 while let Ok(num) = ip_stack_recv.recv_ip_packet(&mut bufs, &mut sizes).await {
39 for index in 0..num {
40 let buf = &bufs[index];
41 let len = sizes[index];
42 let dst = if buf[0] >> 4 != 4 {
43 if let Some(ipv6_packet) = pnet_packet::ipv6::Ipv6Packet::new(&buf[..len]) {
44 let last: [u8; 4] = ipv6_packet.get_destination().octets()[12..]
45 .try_into()
46 .unwrap();
47 Ipv4Addr::from(last)
48 } else {
49 continue;
50 }
51 } else {
52 Ipv4Addr::new(buf[16], buf[17], buf[18], buf[19])
53 };
54 let mut send_packet = pipe_writer.allocate_send_packet();
55 send_packet.set_payload(buf);
56 if let Err(e) = pipe_writer.send_packet_to(send_packet, &dst.into()).await {
57 log::warn!("send_packet_to {e:?},dst={dst}");
58 }
59 }
60 }
61}
62
63async fn pipe_accept_handle(mut pipe: Pipe, ip_stack_send: IpStackSend) -> io::Result<()> {
64 loop {
65 let line = pipe.accept().await?;
66 let ip_stack_send = ip_stack_send.clone();
67 tokio::spawn(async move {
68 let addr = line.remote_addr();
69 if let Err(e) = pipe_line_recv_handle(line, ip_stack_send).await {
70 log::warn!("pipe_line_recv {e:?} {addr:?}");
71 }
72 });
73 }
74}
75async fn pipe_line_recv_handle(
76 mut pipe_line: PipeLine,
77 ip_stack_send: IpStackSend,
78) -> io::Result<()> {
79 let mut list = Vec::with_capacity(16);
80
81 loop {
82 let result = match pipe_line.recv_multi(&mut list).await {
83 Ok(rs) => rs,
84 Err(e) => {
85 return match e {
86 RecvError::Done => Ok(()),
87 RecvError::Io(e) => Err(e),
88 }
89 }
90 };
91 if let Err(e) = result {
92 log::warn!("recv_data_handle {e:?} {:?}", pipe_line.remote_addr());
93 continue;
94 }
95 for data in list.drain(..) {
96 if let Err(e) = ip_stack_send.send_ip_packet(data.payload()).await {
97 log::warn!("{e:?} {:?}", data.route_key());
98 }
99 }
100 }
101}