connect/
connect.rs

1use std::net::TcpListener;
2use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3use std::os::fd::AsRawFd;
4
5use ysockaddr::YSockAddrR;
6
7use io_uring_bearer::Completion;
8use io_uring_bearer::UringBearer;
9
10use capacity::{Capacity, Setting};
11use io_uring_bearer::BearerCapacityKind;
12
13use io_uring_op_connect::Connect;
14use io_uring_opcode::OpExtConnect;
15
16#[derive(Clone, Debug)]
17pub struct MyCapacity;
18
19impl Setting<BearerCapacityKind> for MyCapacity {
20    fn setting(&self, v: &BearerCapacityKind) -> usize {
21        match v {
22            BearerCapacityKind::CoreQueue => 1,
23            BearerCapacityKind::RegisteredFd => 1,
24            BearerCapacityKind::PendingCompletions => 1,
25            BearerCapacityKind::Buffers => 0,
26            BearerCapacityKind::Futexes => 0,
27        }
28    }
29}
30
31fn main() {
32    // Bring up a listener
33    let listener = TcpListener::bind("127.0.0.1:0").unwrap();
34    listener.set_nonblocking(true).unwrap();
35    let listener_port = match listener.local_addr().unwrap() {
36        SocketAddr::V4(addr) => addr.port(),
37        SocketAddr::V6(_) => panic!("IPv4 Requested, IPv6 bound?"),
38    };
39
40    println!("std TcpListener on 127.0.0.1:{listener_port}");
41
42    // Now submit a completion for Connect OpCode against the std listener
43    let my_cap = Capacity::<MyCapacity, BearerCapacityKind>::with_planned(MyCapacity {});
44    let mut bearer = UringBearer::with_capacity(my_cap).unwrap();
45
46    let sock = unsafe { libc::socket(libc::AF_INET, libc::SOCK_STREAM, libc::IPPROTO_TCP) };
47    bearer
48        .io_uring()
49        .submitter()
50        .register_files(&[sock])
51        .unwrap();
52
53    let ysaddr = YSockAddrR::from_sockaddr(SocketAddr::new(
54        IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
55        listener_port,
56    ));
57
58    let _op_idx = bearer
59        .push_connect(Connect::with_ysockaddr_c(0, ysaddr.as_c()).unwrap())
60        .unwrap();
61
62    bearer.submit_and_wait(1).unwrap();
63
64    #[derive(Debug)]
65    struct UserData {
66        e: u32,
67    }
68
69    let mut user = UserData { e: 0 };
70    let mut wait_count = 0;
71
72    loop {
73        bearer
74            .completions(&mut user, |user, entry, rec| match rec {
75                Completion::Connect(c) => {
76                    user.e += 1;
77                    println!(
78                        "Connected Q<{:?}> Fixed_fd<{}> ysaddr<{:?}",
79                        entry,
80                        c.fixed_fd(),
81                        c.ysaddr()
82                    );
83                    // no error
84                    assert_eq!(entry.result(), 0);
85                }
86                _ => panic!("Queue had something else than Connect?"),
87            })
88            .unwrap();
89
90        if user.e != 0 {
91            break;
92        }
93
94        if wait_count > 4 {
95            panic!("wait_count > 5 on Connect example.");
96        }
97
98        wait_count += 1;
99        println!("Waiting for the completion @ {wait_count} ..");
100        let st = std::time::Duration::from_secs(1);
101        std::thread::sleep(st);
102    }
103
104    // Now check that the listener got the connection.
105    match listener.accept() {
106        Ok((in_s, in_a)) => println!(
107            "std TcpListener accepted {} from {}",
108            in_s.as_raw_fd(),
109            in_a
110        ),
111        Err(e) => panic!("std TcpListener resulted in error = {e}"),
112    }
113}