1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use std::task::{Context, Poll};
use std::time::Duration;
use std::{future::Future, io};

use crate::config::NucleiConfig;
use once_cell::sync::OnceCell;

use super::syscore::*;
use super::waker::*;
use crate::spawn_blocking;
use crate::sys::IoBackend;

pub use super::handle::*;

///
/// Concrete proactor instance
pub struct Proactor(pub(crate) SysProactor);
unsafe impl Send for Proactor {}
unsafe impl Sync for Proactor {}

static mut PROACTOR: OnceCell<Proactor> = OnceCell::new();

impl Proactor {
    /// Returns a reference to the proactor.
    pub fn get() -> &'static Proactor {
        unsafe {
            PROACTOR.get_or_init(|| {
                Proactor(
                    SysProactor::new(NucleiConfig::default())
                        .expect("cannot initialize IO backend"),
                )
            })
        }
    }

    /// Builds a proactor instance with given config and returns a reference to it.
    pub fn with_config(config: NucleiConfig) -> &'static Proactor {
        unsafe {
            let proactor =
                Proactor(SysProactor::new(config.clone()).expect("cannot initialize IO backend"));
            PROACTOR
                .set(proactor)
                .map_err(|e| "Proactor instance not being able to set.")
                .unwrap();

            PROACTOR.wait()
        }
    }

    /// Wakes the thread waiting on proactor.
    pub fn wake(&self) {
        self.0.wake().expect("failed to wake thread");
    }

    /// Wait for completion of IO object
    pub fn wait(&self, max_event_size: usize, duration: Option<Duration>) -> io::Result<usize> {
        self.0.wait(max_event_size, duration)
    }

    /// Get the IO backend that is used with Nuclei's proactor.
    pub fn backend() -> IoBackend {
        BACKEND
    }

    /// Get underlying proactor instance.
    pub(crate) fn inner(&self) -> &SysProactor {
        &self.0
    }

    #[cfg(all(feature = "iouring", target_os = "linux"))]
    /// Get IO_URING backend probes
    pub fn ring_params(&self) -> &rustix_uring::Parameters {
        unsafe { IO_URING.as_ref().unwrap().params() }
    }
}

///
/// IO driver that drives underlying event systems
pub fn drive<T>(future: impl Future<Output = T>) -> T {
    let p = Proactor::get();
    let waker = waker_fn(move || {
        p.wake();
    });

    let cx = &mut Context::from_waker(&waker);
    futures::pin_mut!(future);

    let driver = spawn_blocking(move || loop {
        let _ = p.wait(1, None);
    });

    futures::pin_mut!(driver);

    loop {
        if let Poll::Ready(val) = future.as_mut().poll(cx) {
            return val;
        }

        cx.waker().wake_by_ref();

        // TODO: (vcq): we don't need this.
        // let _duration = Duration::from_millis(1);
        let _ = driver.as_mut().poll(cx);
    }
}

#[cfg(test)]
#[cfg(target_os = "linux")]
mod proactor_tests {
    use crate::config::{IoUringConfiguration, NucleiConfig};
    use crate::Proactor;

    #[test]
    #[ignore]
    fn proactor_with_defaults() {
        let old = Proactor::get();

        let osq = old.0.sq.lock();
        let olen = osq.capacity();
        drop(osq);
        dbg!(olen);

        assert_eq!(olen, 2048);
    }

    #[test]
    fn proactor_with_config_rollup() {
        let config = NucleiConfig {
            iouring: IoUringConfiguration {
                queue_len: 10,
                sqpoll_wake_interval: Some(11),
                per_numa_bounded_worker_count: Some(12),
                per_numa_unbounded_worker_count: Some(13),
                ..IoUringConfiguration::default()
            },
        };
        let new = Proactor::with_config(config);
        let old = Proactor::get();

        let nsq = new.0.sq.lock();
        let nlen = nsq.capacity();
        drop(nsq);

        let osq = old.0.sq.lock();
        let olen = osq.capacity();
        drop(osq);

        dbg!(nlen);
        dbg!(olen);

        assert_eq!(nlen, olen);
        assert_eq!(nlen, 16);
        assert_eq!(olen, 16);
    }
}