open_coroutine_core/net/
mod.rs

1use crate::config::Config;
2use crate::coroutine::suspender::Suspender;
3use crate::net::event_loop::EventLoop;
4use crate::net::join::JoinHandle;
5use crate::{error, info};
6use once_cell::sync::OnceCell;
7use std::collections::VecDeque;
8use std::ffi::{c_int, c_longlong};
9use std::io::{Error, ErrorKind};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Condvar, Mutex};
12use std::time::Duration;
13
14cfg_if::cfg_if! {
15    if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
16        use libc::{epoll_event, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t};
17        use std::ffi::{c_char, c_uint, c_void};
18    }
19}
20
21cfg_if::cfg_if! {
22    if #[cfg(all(windows, feature = "iocp"))] {
23        use std::ffi::c_uint;
24        use windows_sys::core::{PCSTR, PSTR};
25        use windows_sys::Win32::Networking::WinSock::{
26            LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, WSABUF,
27        };
28        use windows_sys::Win32::System::IO::OVERLAPPED;
29    }
30}
31
32/// 做C兼容时会用到
33pub type UserFunc = extern "C" fn(usize) -> usize;
34
35mod selector;
36
37#[allow(clippy::too_many_arguments)]
38#[cfg(any(
39    all(target_os = "linux", feature = "io_uring"),
40    all(windows, feature = "iocp")
41))]
42mod operator;
43
44#[allow(missing_docs)]
45pub mod event_loop;
46
47/// Task join abstraction and impl.
48pub mod join;
49
50static INSTANCE: OnceCell<EventLoops> = OnceCell::new();
51
52/// The manager for `EventLoop`.
53#[repr(C)]
54#[derive(Debug)]
55pub struct EventLoops {
56    index: AtomicUsize,
57    loops: VecDeque<Arc<EventLoop<'static>>>,
58    shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
59}
60
61unsafe impl Send for EventLoops {}
62
63unsafe impl Sync for EventLoops {}
64
65impl EventLoops {
66    /// Init the `EventLoops`.
67    pub fn init(config: &Config) {
68        _ = INSTANCE.get_or_init(|| {
69            #[cfg(feature = "ci")]
70            crate::common::ci::init();
71            let loops = Self::new(
72                config.event_loop_size(),
73                config.stack_size(),
74                config.min_size(),
75                config.max_size(),
76                config.keep_alive_time(),
77            )
78            .expect("init default EventLoops failed !");
79            #[cfg(feature = "log")]
80            let _ = tracing_subscriber::fmt()
81                .with_thread_names(true)
82                .with_line_number(true)
83                .with_timer(tracing_subscriber::fmt::time::OffsetTime::new(
84                    time::UtcOffset::from_hms(8, 0, 0).expect("create UtcOffset failed !"),
85                    time::format_description::well_known::Rfc2822,
86                ))
87                .try_init();
88            info!("open-coroutine init with {config:#?}");
89            loops
90        });
91    }
92
93    /// Create a new `EventLoops`.
94    pub fn new(
95        event_loop_size: usize,
96        stack_size: usize,
97        min_size: usize,
98        max_size: usize,
99        keep_alive_time: u64,
100    ) -> std::io::Result<Self> {
101        let shared_stop = Arc::new((Mutex::new(AtomicUsize::new(0)), Condvar::new()));
102        let mut loops = VecDeque::new();
103        for i in 0..event_loop_size {
104            loops.push_back(
105                EventLoop::new(
106                    format!("open-coroutine-event-loop-{i}"),
107                    i,
108                    stack_size,
109                    min_size,
110                    max_size,
111                    keep_alive_time,
112                    shared_stop.clone(),
113                )?
114                .start()?,
115            );
116        }
117        Ok(Self {
118            index: AtomicUsize::new(0),
119            loops,
120            shared_stop,
121        })
122    }
123
124    fn round_robin() -> &'static Arc<EventLoop<'static>> {
125        let instance = INSTANCE.get().expect("EventLoops not init !");
126        let index = instance.index.fetch_add(1, Ordering::Release) % instance.loops.len();
127        instance
128            .loops
129            .get(index)
130            .unwrap_or_else(move || panic!("init event-loop-{index} failed!"))
131    }
132
133    /// Get a `EventLoop`, prefer current.
134    fn event_loop() -> &'static EventLoop<'static> {
135        EventLoop::current().unwrap_or_else(|| Self::round_robin())
136    }
137
138    /// Submit a new task to event-loop.
139    ///
140    /// Allow multiple threads to concurrently submit task to the pool,
141    /// but only allow one thread to execute scheduling.
142    pub fn submit_task(
143        name: Option<String>,
144        func: impl FnOnce(Option<usize>) -> Option<usize> + 'static,
145        param: Option<usize>,
146        priority: Option<c_longlong>,
147    ) -> JoinHandle {
148        let event_loop = Self::round_robin();
149        event_loop
150            .submit_task(name, func, param, priority)
151            .map_or_else(
152                |_| JoinHandle::err(event_loop),
153                |n| JoinHandle::new(event_loop, n.as_str()),
154            )
155    }
156
157    /// Try to cancel a task from event-loop.
158    pub fn try_cancel_task(name: &str) {
159        EventLoop::try_cancel_task(name);
160    }
161
162    /// Submit a new coroutine to event-loop.
163    ///
164    /// Allow multiple threads to concurrently submit coroutine to the pool,
165    /// but only allow one thread to execute scheduling.
166    pub fn submit_co(
167        f: impl FnOnce(&Suspender<(), ()>, ()) -> Option<usize> + 'static,
168        stack_size: Option<usize>,
169        priority: Option<c_longlong>,
170    ) -> std::io::Result<()> {
171        Self::round_robin().submit_co(f, stack_size, priority)
172    }
173
174    /// Waiting for read or write events to occur.
175    /// This method can only be used in coroutines.
176    pub fn wait_event(timeout: Option<Duration>) -> std::io::Result<()> {
177        Self::event_loop().timed_wait_just(timeout)
178    }
179
180    /// Waiting for a read event to occur.
181    /// This method can only be used in coroutines.
182    pub fn wait_read_event(fd: c_int, timeout: Option<Duration>) -> std::io::Result<()> {
183        let event_loop = Self::event_loop();
184        event_loop.add_read_event(fd)?;
185        event_loop.wait_just(timeout)
186    }
187
188    /// Waiting for a write event to occur.
189    /// This method can only be used in coroutines.
190    pub fn wait_write_event(fd: c_int, timeout: Option<Duration>) -> std::io::Result<()> {
191        let event_loop = Self::event_loop();
192        event_loop.add_write_event(fd)?;
193        event_loop.wait_just(timeout)
194    }
195
196    /// Remove read and write event interests.
197    /// This method can only be used in coroutines.
198    pub fn del_event(fd: c_int) -> std::io::Result<()> {
199        if let Some(event_loop) = EventLoop::current() {
200            event_loop.del_event(fd)?;
201        } else {
202            let instance = INSTANCE.get().expect("EventLoops not init !");
203            for event_loop in &instance.loops {
204                event_loop.del_event(fd)?;
205            }
206        }
207        Ok(())
208    }
209
210    /// Remove read event interest.
211    /// This method can only be used in coroutines.
212    pub fn del_read_event(fd: c_int) -> std::io::Result<()> {
213        if let Some(event_loop) = EventLoop::current() {
214            event_loop.del_read_event(fd)?;
215        } else {
216            let instance = INSTANCE.get().expect("EventLoops not init !");
217            for event_loop in &instance.loops {
218                event_loop.del_read_event(fd)?;
219            }
220        }
221        Ok(())
222    }
223
224    /// Remove write event interest.
225    /// This method can only be used in coroutines.
226    pub fn del_write_event(fd: c_int) -> std::io::Result<()> {
227        if let Some(event_loop) = EventLoop::current() {
228            event_loop.del_write_event(fd)?;
229        } else {
230            let instance = INSTANCE.get().expect("EventLoops not init !");
231            for event_loop in &instance.loops {
232                event_loop.del_write_event(fd)?;
233            }
234        }
235        Ok(())
236    }
237
238    /// Stop all `EventLoop`.
239    pub fn stop(wait_time: Duration) -> std::io::Result<()> {
240        if let Some(instance) = INSTANCE.get() {
241            for i in &instance.loops {
242                _ = i.stop(Duration::ZERO);
243            }
244            let (lock, cvar) = &*instance.shared_stop;
245            let guard = lock
246                .lock()
247                .map_err(|_| Error::new(ErrorKind::TimedOut, "wait failed !"))?;
248            let result = cvar
249                .wait_timeout_while(guard, wait_time, |stopped| {
250                    stopped.load(Ordering::Acquire) > 0
251                })
252                .map_err(|_| Error::new(ErrorKind::TimedOut, "wait failed !"))?;
253            if result.1.timed_out() {
254                error!("open-coroutine stop timeout !");
255                return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
256            }
257            #[cfg(all(unix, feature = "preemptive"))]
258            crate::monitor::Monitor::stop();
259        }
260        Ok(())
261    }
262}
263
264macro_rules! impl_io_uring {
265    ( $syscall: ident($($arg: ident: $arg_type: ty),*) -> $result: ty ) => {
266        #[cfg(all(target_os = "linux", feature = "io_uring"))]
267        impl EventLoops {
268            #[allow(missing_docs)]
269            pub fn $syscall(
270                $($arg: $arg_type),*
271            ) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
272                Self::event_loop().$syscall($($arg, )*)
273            }
274        }
275    }
276}
277
278impl_io_uring!(epoll_ctl(epfd: c_int, op: c_int, fd: c_int, event: *mut epoll_event) -> c_int);
279impl_io_uring!(socket(domain: c_int, ty: c_int, protocol: c_int) -> c_int);
280impl_io_uring!(accept(fd: c_int, addr: *mut sockaddr, len: *mut socklen_t) -> c_int);
281impl_io_uring!(accept4(fd: c_int, addr: *mut sockaddr, len: *mut socklen_t, flg: c_int) -> c_int);
282impl_io_uring!(shutdown(fd: c_int, how: c_int) -> c_int);
283impl_io_uring!(connect(fd: c_int, address: *const sockaddr, len: socklen_t) -> c_int);
284impl_io_uring!(close(fd: c_int) -> c_int);
285impl_io_uring!(recv(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int) -> ssize_t);
286impl_io_uring!(read(fd: c_int, buf: *mut c_void, count: size_t) -> ssize_t);
287impl_io_uring!(pread(fd: c_int, buf: *mut c_void, count: size_t, offset: off_t) -> ssize_t);
288impl_io_uring!(readv(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
289impl_io_uring!(preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
290impl_io_uring!(recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t);
291impl_io_uring!(send(fd: c_int, buf: *const c_void, len: size_t, flags: c_int) -> ssize_t);
292impl_io_uring!(sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int, addr: *const sockaddr, addrlen: socklen_t) -> ssize_t);
293impl_io_uring!(write(fd: c_int, buf: *const c_void, count: size_t) -> ssize_t);
294impl_io_uring!(pwrite(fd: c_int, buf: *const c_void, count: size_t, offset: off_t) -> ssize_t);
295impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
296impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
297impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t);
298impl_io_uring!(fsync(fd: c_int) -> c_int);
299impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
300impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
301impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);
302
303macro_rules! impl_iocp {
304    ( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
305        #[allow(non_snake_case)]
306        #[cfg(all(windows, feature = "iocp"))]
307        impl EventLoops {
308            #[allow(missing_docs)]
309            pub fn $syscall(
310                $($arg: $arg_type),*
311            ) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
312                Self::event_loop().$syscall($($arg, )*)
313            }
314        }
315    }
316}
317
318impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
319impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
320impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
321impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
322impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);