1use crate::config::Config;
2use crate::coroutine::suspender::Suspender;
3use crate::net::event_loop::EventLoop;
4use crate::net::join::JoinHandle;
5use crate::{error, info};
6use once_cell::sync::OnceCell;
7use std::collections::VecDeque;
8use std::ffi::{c_int, c_longlong};
9use std::io::{Error, ErrorKind};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Condvar, Mutex};
12use std::time::Duration;
13
14cfg_if::cfg_if! {
15 if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
16 use libc::{epoll_event, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t};
17 use std::ffi::{c_char, c_uint, c_void};
18 }
19}
20
21cfg_if::cfg_if! {
22 if #[cfg(all(windows, feature = "iocp"))] {
23 use std::ffi::c_uint;
24 use windows_sys::core::{PCSTR, PSTR};
25 use windows_sys::Win32::Networking::WinSock::{
26 LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, WSABUF,
27 };
28 use windows_sys::Win32::System::IO::OVERLAPPED;
29 }
30}
31
32pub type UserFunc = extern "C" fn(usize) -> usize;
34
35mod selector;
36
37#[allow(clippy::too_many_arguments)]
38#[cfg(any(
39 all(target_os = "linux", feature = "io_uring"),
40 all(windows, feature = "iocp")
41))]
42mod operator;
43
44#[allow(missing_docs)]
45pub mod event_loop;
46
47pub mod join;
49
50static INSTANCE: OnceCell<EventLoops> = OnceCell::new();
51
52#[repr(C)]
54#[derive(Debug)]
55pub struct EventLoops {
56 index: AtomicUsize,
57 loops: VecDeque<Arc<EventLoop<'static>>>,
58 shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
59}
60
61unsafe impl Send for EventLoops {}
62
63unsafe impl Sync for EventLoops {}
64
65impl EventLoops {
66 pub fn init(config: &Config) {
68 _ = INSTANCE.get_or_init(|| {
69 #[cfg(feature = "ci")]
70 crate::common::ci::init();
71 let loops = Self::new(
72 config.event_loop_size(),
73 config.stack_size(),
74 config.min_size(),
75 config.max_size(),
76 config.keep_alive_time(),
77 )
78 .expect("init default EventLoops failed !");
79 #[cfg(feature = "log")]
80 let _ = tracing_subscriber::fmt()
81 .with_thread_names(true)
82 .with_line_number(true)
83 .with_timer(tracing_subscriber::fmt::time::OffsetTime::new(
84 time::UtcOffset::from_hms(8, 0, 0).expect("create UtcOffset failed !"),
85 time::format_description::well_known::Rfc2822,
86 ))
87 .try_init();
88 info!("open-coroutine init with {config:#?}");
89 loops
90 });
91 }
92
93 pub fn new(
95 event_loop_size: usize,
96 stack_size: usize,
97 min_size: usize,
98 max_size: usize,
99 keep_alive_time: u64,
100 ) -> std::io::Result<Self> {
101 let shared_stop = Arc::new((Mutex::new(AtomicUsize::new(0)), Condvar::new()));
102 let mut loops = VecDeque::new();
103 for i in 0..event_loop_size {
104 loops.push_back(
105 EventLoop::new(
106 format!("open-coroutine-event-loop-{i}"),
107 i,
108 stack_size,
109 min_size,
110 max_size,
111 keep_alive_time,
112 shared_stop.clone(),
113 )?
114 .start()?,
115 );
116 }
117 Ok(Self {
118 index: AtomicUsize::new(0),
119 loops,
120 shared_stop,
121 })
122 }
123
124 fn round_robin() -> &'static Arc<EventLoop<'static>> {
125 let instance = INSTANCE.get().expect("EventLoops not init !");
126 let index = instance.index.fetch_add(1, Ordering::Release) % instance.loops.len();
127 instance
128 .loops
129 .get(index)
130 .unwrap_or_else(move || panic!("init event-loop-{index} failed!"))
131 }
132
133 fn event_loop() -> &'static EventLoop<'static> {
135 EventLoop::current().unwrap_or_else(|| Self::round_robin())
136 }
137
138 pub fn submit_task(
143 name: Option<String>,
144 func: impl FnOnce(Option<usize>) -> Option<usize> + 'static,
145 param: Option<usize>,
146 priority: Option<c_longlong>,
147 ) -> JoinHandle {
148 let event_loop = Self::round_robin();
149 event_loop
150 .submit_task(name, func, param, priority)
151 .map_or_else(
152 |_| JoinHandle::err(event_loop),
153 |n| JoinHandle::new(event_loop, n.as_str()),
154 )
155 }
156
157 pub fn try_cancel_task(name: &str) {
159 EventLoop::try_cancel_task(name);
160 }
161
162 pub fn submit_co(
167 f: impl FnOnce(&Suspender<(), ()>, ()) -> Option<usize> + 'static,
168 stack_size: Option<usize>,
169 priority: Option<c_longlong>,
170 ) -> std::io::Result<()> {
171 Self::round_robin().submit_co(f, stack_size, priority)
172 }
173
174 pub fn wait_event(timeout: Option<Duration>) -> std::io::Result<()> {
177 Self::event_loop().timed_wait_just(timeout)
178 }
179
180 pub fn wait_read_event(fd: c_int, timeout: Option<Duration>) -> std::io::Result<()> {
183 let event_loop = Self::event_loop();
184 event_loop.add_read_event(fd)?;
185 event_loop.wait_just(timeout)
186 }
187
188 pub fn wait_write_event(fd: c_int, timeout: Option<Duration>) -> std::io::Result<()> {
191 let event_loop = Self::event_loop();
192 event_loop.add_write_event(fd)?;
193 event_loop.wait_just(timeout)
194 }
195
196 pub fn del_event(fd: c_int) -> std::io::Result<()> {
199 if let Some(event_loop) = EventLoop::current() {
200 event_loop.del_event(fd)?;
201 } else {
202 let instance = INSTANCE.get().expect("EventLoops not init !");
203 for event_loop in &instance.loops {
204 event_loop.del_event(fd)?;
205 }
206 }
207 Ok(())
208 }
209
210 pub fn del_read_event(fd: c_int) -> std::io::Result<()> {
213 if let Some(event_loop) = EventLoop::current() {
214 event_loop.del_read_event(fd)?;
215 } else {
216 let instance = INSTANCE.get().expect("EventLoops not init !");
217 for event_loop in &instance.loops {
218 event_loop.del_read_event(fd)?;
219 }
220 }
221 Ok(())
222 }
223
224 pub fn del_write_event(fd: c_int) -> std::io::Result<()> {
227 if let Some(event_loop) = EventLoop::current() {
228 event_loop.del_write_event(fd)?;
229 } else {
230 let instance = INSTANCE.get().expect("EventLoops not init !");
231 for event_loop in &instance.loops {
232 event_loop.del_write_event(fd)?;
233 }
234 }
235 Ok(())
236 }
237
238 pub fn stop(wait_time: Duration) -> std::io::Result<()> {
240 if let Some(instance) = INSTANCE.get() {
241 for i in &instance.loops {
242 _ = i.stop(Duration::ZERO);
243 }
244 let (lock, cvar) = &*instance.shared_stop;
245 let guard = lock
246 .lock()
247 .map_err(|_| Error::new(ErrorKind::TimedOut, "wait failed !"))?;
248 let result = cvar
249 .wait_timeout_while(guard, wait_time, |stopped| {
250 stopped.load(Ordering::Acquire) > 0
251 })
252 .map_err(|_| Error::new(ErrorKind::TimedOut, "wait failed !"))?;
253 if result.1.timed_out() {
254 error!("open-coroutine stop timeout !");
255 return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
256 }
257 #[cfg(all(unix, feature = "preemptive"))]
258 crate::monitor::Monitor::stop();
259 }
260 Ok(())
261 }
262}
263
264macro_rules! impl_io_uring {
265 ( $syscall: ident($($arg: ident: $arg_type: ty),*) -> $result: ty ) => {
266 #[cfg(all(target_os = "linux", feature = "io_uring"))]
267 impl EventLoops {
268 #[allow(missing_docs)]
269 pub fn $syscall(
270 $($arg: $arg_type),*
271 ) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
272 Self::event_loop().$syscall($($arg, )*)
273 }
274 }
275 }
276}
277
278impl_io_uring!(epoll_ctl(epfd: c_int, op: c_int, fd: c_int, event: *mut epoll_event) -> c_int);
279impl_io_uring!(socket(domain: c_int, ty: c_int, protocol: c_int) -> c_int);
280impl_io_uring!(accept(fd: c_int, addr: *mut sockaddr, len: *mut socklen_t) -> c_int);
281impl_io_uring!(accept4(fd: c_int, addr: *mut sockaddr, len: *mut socklen_t, flg: c_int) -> c_int);
282impl_io_uring!(shutdown(fd: c_int, how: c_int) -> c_int);
283impl_io_uring!(connect(fd: c_int, address: *const sockaddr, len: socklen_t) -> c_int);
284impl_io_uring!(close(fd: c_int) -> c_int);
285impl_io_uring!(recv(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int) -> ssize_t);
286impl_io_uring!(read(fd: c_int, buf: *mut c_void, count: size_t) -> ssize_t);
287impl_io_uring!(pread(fd: c_int, buf: *mut c_void, count: size_t, offset: off_t) -> ssize_t);
288impl_io_uring!(readv(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
289impl_io_uring!(preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
290impl_io_uring!(recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t);
291impl_io_uring!(send(fd: c_int, buf: *const c_void, len: size_t, flags: c_int) -> ssize_t);
292impl_io_uring!(sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int, addr: *const sockaddr, addrlen: socklen_t) -> ssize_t);
293impl_io_uring!(write(fd: c_int, buf: *const c_void, count: size_t) -> ssize_t);
294impl_io_uring!(pwrite(fd: c_int, buf: *const c_void, count: size_t, offset: off_t) -> ssize_t);
295impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
296impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
297impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t);
298impl_io_uring!(fsync(fd: c_int) -> c_int);
299impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
300impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
301impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);
302
303macro_rules! impl_iocp {
304 ( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
305 #[allow(non_snake_case)]
306 #[cfg(all(windows, feature = "iocp"))]
307 impl EventLoops {
308 #[allow(missing_docs)]
309 pub fn $syscall(
310 $($arg: $arg_type),*
311 ) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
312 Self::event_loop().$syscall($($arg, )*)
313 }
314 }
315 }
316}
317
318impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
319impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
320impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
321impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
322impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);