1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::{
    cell::UnsafeCell,
    convert::TryFrom,
    fs::File,
    io,
    net::{TcpListener, TcpStream},
    ops::Neg,
    os::unix::io::{AsRawFd, FromRawFd},
    sync::{
        atomic::{
            AtomicU32, AtomicU64,
            Ordering::{Acquire, Relaxed, Release},
        },
        Arc, Condvar, Mutex,
    },
};

use super::{
    pair, AsIoVec, AsIoVecMut, Completion, Filler, FromCqe,
    Measure, M,
};

mod config;
mod constants;
mod cq;
mod in_flight;
mod kernel_types;
mod sq;
mod syscall;
mod ticket_queue;
mod uring;

pub(crate) use {
    constants::*,
    cq::Cq,
    in_flight::InFlight,
    kernel_types::{
        io_uring_cqe, io_uring_params, io_uring_sqe,
    },
    sq::Sq,
    syscall::{enter, setup},
    ticket_queue::TicketQueue,
};

pub use {
    config::Config,
    uring::{Rio, Uring},
};

/// Specify whether `io_uring` should
/// run operations in a specific order.
/// By default, it will run independent
/// operations in any order it can to
/// speed things up. This can be constrained
/// by either submitting chains of `Link`
/// events, which are executed one after the other,
/// or by specifying the `Drain` ordering
/// which causes all previously submitted operations
/// to complete first.
#[derive(Clone, Debug, Copy)]
pub enum Ordering {
    /// No ordering requirements
    None,
    /// `Ordering::Link` causes the next
    /// submitted operation to wait until
    /// this one finishes. Useful for
    /// things like file copy, fsync-after-write,
    /// or proxies.
    Link,
    /// `Ordering::Drain` causes all previously
    /// submitted operations to complete before
    /// this one begins.
    Drain,
}

fn uring_mmap(
    size: usize,
    ring_fd: i32,
    offset: i64,
) -> io::Result<*mut libc::c_void> {
    #[allow(unsafe_code)]
    let ptr = unsafe {
        libc::mmap(
            std::ptr::null_mut(),
            size,
            libc::PROT_READ | libc::PROT_WRITE,
            libc::MAP_SHARED | libc::MAP_POPULATE,
            ring_fd,
            offset,
        )
    };

    if ptr.is_null() || ptr == libc::MAP_FAILED {
        let mut err = io::Error::last_os_error();
        if let Some(12) = err.raw_os_error() {
            err = io::Error::new(
                io::ErrorKind::Other,
                "Not enough lockable memory. You probably \
                 need to raise the memlock rlimit, which \
                 often defaults to a pretty low number.",
            );
        }
        return Err(err);
    }

    Ok(ptr)
}

impl FromCqe for TcpStream {
    fn from_cqe(cqe: io_uring_cqe) -> TcpStream {
        #[allow(unsafe_code)]
        unsafe {
            TcpStream::from_raw_fd(cqe.res)
        }
    }
}