tokio 1.37.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", target_os = "linux"))]

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::UdpSocket;

/// Ensure that UDP sockets have functional budgeting
///
/// # Design
/// Two sockets communicate by spamming packets from one to the other.
///
/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
/// send system call because we are using the loopback interface.
/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
/// entirety of the lifecycle of a packet within the kernel network stack.
///
/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
/// is through budgeting.
///
/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
/// and there are two budget events per packet, a send and a recv.
#[tokio::test]
async fn coop_budget_udp_send_recv() {
    const BUDGET: usize = 128;
    const N_ITERATIONS: usize = 1024;

    const PACKET: &[u8] = b"Hello, world";
    const PACKET_LEN: usize = 12;

    assert_eq!(
        PACKET_LEN,
        PACKET.len(),
        "Defect in test, programmer can't do math"
    );

    // bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
    let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
    let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();

    tx.connect(rx.local_addr().unwrap()).await.unwrap();
    rx.connect(tx.local_addr().unwrap()).await.unwrap();

    let tracker = Arc::new(AtomicUsize::default());

    let tracker_clone = Arc::clone(&tracker);

    tokio::task::yield_now().await;

    tokio::spawn(async move {
        loop {
            tracker_clone.fetch_add(1, Ordering::SeqCst);

            tokio::task::yield_now().await;
        }
    });

    for _ in 0..N_ITERATIONS {
        tx.send(PACKET).await.unwrap();

        let mut tmp = [0; PACKET_LEN];

        // ensure that we aren't somehow accumulating other
        assert_eq!(
            PACKET_LEN,
            rx.recv(&mut tmp).await.unwrap(),
            "Defect in test case, received unexpected result from socket"
        );
        assert_eq!(
            PACKET, &tmp,
            "Defect in test case, received unexpected result from socket"
        );
    }

    assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}