1use crate::config::Config;
2use crate::coroutine::suspender::Suspender;
3use crate::net::event_loop::EventLoop;
4use crate::net::join::JoinHandle;
5use crate::{error, info};
6use once_cell::sync::OnceCell;
7use std::collections::VecDeque;
8use std::ffi::{c_int, c_longlong};
9use std::io::{Error, ErrorKind};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Condvar, Mutex};
12use std::time::Duration;
13
14cfg_if::cfg_if! {
15 if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
16 use libc::{epoll_event, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t};
17 use std::ffi::{c_char, c_uint, c_void};
18 }
19}
20
21pub type UserFunc = extern "C" fn(usize) -> usize;
23
24mod selector;
25
26#[allow(clippy::too_many_arguments)]
27#[cfg(all(target_os = "linux", feature = "io_uring"))]
28mod operator;
29
30#[allow(missing_docs)]
31pub mod event_loop;
32
33pub mod join;
35
36static INSTANCE: OnceCell<EventLoops> = OnceCell::new();
37
38#[repr(C)]
40#[derive(Debug)]
41pub struct EventLoops {
42 index: AtomicUsize,
43 loops: VecDeque<Arc<EventLoop<'static>>>,
44 shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
45}
46
47unsafe impl Send for EventLoops {}
48
49unsafe impl Sync for EventLoops {}
50
51impl EventLoops {
52 pub fn init(config: &Config) {
54 _ = INSTANCE.get_or_init(|| {
55 #[cfg(feature = "ci")]
56 crate::common::ci::init();
57 let loops = Self::new(
58 config.event_loop_size(),
59 config.stack_size(),
60 config.min_size(),
61 config.max_size(),
62 config.keep_alive_time(),
63 )
64 .expect("init default EventLoops failed !");
65 #[cfg(feature = "log")]
66 let _ = tracing_subscriber::fmt()
67 .with_thread_names(true)
68 .with_line_number(true)
69 .with_timer(tracing_subscriber::fmt::time::OffsetTime::new(
70 time::UtcOffset::from_hms(8, 0, 0).expect("create UtcOffset failed !"),
71 time::format_description::well_known::Rfc2822,
72 ))
73 .try_init();
74 info!("open-coroutine init with {config:#?}");
75 loops
76 });
77 }
78
79 pub fn new(
81 event_loop_size: usize,
82 stack_size: usize,
83 min_size: usize,
84 max_size: usize,
85 keep_alive_time: u64,
86 ) -> std::io::Result<Self> {
87 let shared_stop = Arc::new((Mutex::new(AtomicUsize::new(0)), Condvar::new()));
88 let mut loops = VecDeque::new();
89 for i in 0..event_loop_size {
90 loops.push_back(
91 EventLoop::new(
92 format!("open-coroutine-event-loop-{i}"),
93 i,
94 stack_size,
95 min_size,
96 max_size,
97 keep_alive_time,
98 shared_stop.clone(),
99 )?
100 .start()?,
101 );
102 }
103 Ok(Self {
104 index: AtomicUsize::new(0),
105 loops,
106 shared_stop,
107 })
108 }
109
110 fn round_robin() -> &'static Arc<EventLoop<'static>> {
111 let instance = INSTANCE.get().expect("EventLoops not init !");
112 let index = instance.index.fetch_add(1, Ordering::Release) % instance.loops.len();
113 instance
114 .loops
115 .get(index)
116 .unwrap_or_else(move || panic!("init event-loop-{index} failed!"))
117 }
118
119 fn event_loop() -> &'static EventLoop<'static> {
121 EventLoop::current().unwrap_or_else(|| Self::round_robin())
122 }
123
124 pub fn submit_task(
129 name: Option<String>,
130 func: impl FnOnce(Option<usize>) -> Option<usize> + 'static,
131 param: Option<usize>,
132 priority: Option<c_longlong>,
133 ) -> JoinHandle {
134 let event_loop = Self::round_robin();
135 event_loop
136 .submit_task(name, func, param, priority)
137 .map_or_else(
138 |_| JoinHandle::err(event_loop),
139 |n| JoinHandle::new(event_loop, n.as_str()),
140 )
141 }
142
143 pub fn submit_co(
148 f: impl FnOnce(&Suspender<(), ()>, ()) -> Option<usize> + 'static,
149 stack_size: Option<usize>,
150 priority: Option<c_longlong>,
151 ) -> std::io::Result<()> {
152 Self::round_robin().submit_co(f, stack_size, priority)
153 }
154
155 pub fn wait_event(timeout: Option<Duration>) -> std::io::Result<()> {
158 Self::event_loop().timed_wait_just(timeout)
159 }
160
161 pub fn wait_read_event(fd: c_int, timeout: Option<Duration>) -> std::io::Result<()> {
164 let event_loop = Self::event_loop();
165 event_loop.add_read_event(fd)?;
166 event_loop.wait_just(timeout)
167 }
168
169 pub fn wait_write_event(fd: c_int, timeout: Option<Duration>) -> std::io::Result<()> {
172 let event_loop = Self::event_loop();
173 event_loop.add_write_event(fd)?;
174 event_loop.wait_just(timeout)
175 }
176
177 pub fn del_event(fd: c_int) -> std::io::Result<()> {
180 if let Some(event_loop) = EventLoop::current() {
181 event_loop.del_event(fd)?;
182 } else {
183 let instance = INSTANCE.get().expect("EventLoops not init !");
184 for event_loop in &instance.loops {
185 event_loop.del_event(fd)?;
186 }
187 }
188 Ok(())
189 }
190
191 pub fn del_read_event(fd: c_int) -> std::io::Result<()> {
194 if let Some(event_loop) = EventLoop::current() {
195 event_loop.del_read_event(fd)?;
196 } else {
197 let instance = INSTANCE.get().expect("EventLoops not init !");
198 for event_loop in &instance.loops {
199 event_loop.del_read_event(fd)?;
200 }
201 }
202 Ok(())
203 }
204
205 pub fn del_write_event(fd: c_int) -> std::io::Result<()> {
208 if let Some(event_loop) = EventLoop::current() {
209 event_loop.del_write_event(fd)?;
210 } else {
211 let instance = INSTANCE.get().expect("EventLoops not init !");
212 for event_loop in &instance.loops {
213 event_loop.del_write_event(fd)?;
214 }
215 }
216 Ok(())
217 }
218
219 pub fn stop(wait_time: Duration) -> std::io::Result<()> {
221 if let Some(instance) = INSTANCE.get() {
222 for i in &instance.loops {
223 _ = i.stop(Duration::ZERO);
224 }
225 let (lock, cvar) = &*instance.shared_stop;
226 let guard = lock
227 .lock()
228 .map_err(|_| Error::new(ErrorKind::TimedOut, "wait failed !"))?;
229 let result = cvar
230 .wait_timeout_while(guard, wait_time, |stopped| {
231 stopped.load(Ordering::Acquire) > 0
232 })
233 .map_err(|_| Error::new(ErrorKind::TimedOut, "wait failed !"))?;
234 if result.1.timed_out() {
235 error!("open-coroutine stop timeout !");
236 return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
237 }
238 #[cfg(all(unix, feature = "preemptive"))]
239 crate::monitor::Monitor::stop();
240 }
241 Ok(())
242 }
243}
244
245macro_rules! impl_io_uring {
246 ( $syscall: ident($($arg: ident: $arg_type: ty),*) -> $result: ty ) => {
247 #[cfg(all(target_os = "linux", feature = "io_uring"))]
248 impl EventLoops {
249 #[allow(missing_docs)]
250 pub fn $syscall(
251 $($arg: $arg_type),*
252 ) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
253 Self::event_loop().$syscall($($arg, )*)
254 }
255 }
256 }
257}
258
259impl_io_uring!(epoll_ctl(epfd: c_int, op: c_int, fd: c_int, event: *mut epoll_event) -> c_int);
260impl_io_uring!(socket(domain: c_int, ty: c_int, protocol: c_int) -> c_int);
261impl_io_uring!(accept(fd: c_int, addr: *mut sockaddr, len: *mut socklen_t) -> c_int);
262impl_io_uring!(accept4(fd: c_int, addr: *mut sockaddr, len: *mut socklen_t, flg: c_int) -> c_int);
263impl_io_uring!(shutdown(fd: c_int, how: c_int) -> c_int);
264impl_io_uring!(connect(fd: c_int, address: *const sockaddr, len: socklen_t) -> c_int);
265impl_io_uring!(close(fd: c_int) -> c_int);
266impl_io_uring!(recv(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int) -> ssize_t);
267impl_io_uring!(read(fd: c_int, buf: *mut c_void, count: size_t) -> ssize_t);
268impl_io_uring!(pread(fd: c_int, buf: *mut c_void, count: size_t, offset: off_t) -> ssize_t);
269impl_io_uring!(readv(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
270impl_io_uring!(preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
271impl_io_uring!(recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t);
272impl_io_uring!(send(fd: c_int, buf: *const c_void, len: size_t, flags: c_int) -> ssize_t);
273impl_io_uring!(sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int, addr: *const sockaddr, addrlen: socklen_t) -> ssize_t);
274impl_io_uring!(write(fd: c_int, buf: *const c_void, count: size_t) -> ssize_t);
275impl_io_uring!(pwrite(fd: c_int, buf: *const c_void, count: size_t, offset: off_t) -> ssize_t);
276impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
277impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
278impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t);
279impl_io_uring!(fsync(fd: c_int) -> c_int);
280impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
281impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
282impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);