open_coroutine_core/net/
mod.rs

1use crate::config::Config;
2use crate::coroutine::suspender::Suspender;
3use crate::net::event_loop::EventLoop;
4use crate::net::join::JoinHandle;
5use crate::{error, info};
6use once_cell::sync::OnceCell;
7use std::collections::VecDeque;
8use std::ffi::{c_int, c_longlong};
9use std::io::{Error, ErrorKind};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Condvar, Mutex};
12use std::time::Duration;
13
14cfg_if::cfg_if! {
15    if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
16        use libc::{epoll_event, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t};
17        use std::ffi::{c_char, c_uint, c_void};
18    }
19}
20
21/// 做C兼容时会用到
22pub type UserFunc = extern "C" fn(usize) -> usize;
23
24mod selector;
25
26#[allow(clippy::too_many_arguments)]
27#[cfg(all(target_os = "linux", feature = "io_uring"))]
28mod operator;
29
30#[allow(missing_docs)]
31pub mod event_loop;
32
33/// Task join abstraction and impl.
34pub mod join;
35
36static INSTANCE: OnceCell<EventLoops> = OnceCell::new();
37
38/// The manager for `EventLoop`.
39#[repr(C)]
40#[derive(Debug)]
41pub struct EventLoops {
42    index: AtomicUsize,
43    loops: VecDeque<Arc<EventLoop<'static>>>,
44    shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
45}
46
47unsafe impl Send for EventLoops {}
48
49unsafe impl Sync for EventLoops {}
50
51impl EventLoops {
52    /// Init the `EventLoops`.
53    pub fn init(config: &Config) {
54        _ = INSTANCE.get_or_init(|| {
55            #[cfg(feature = "ci")]
56            crate::common::ci::init();
57            let loops = Self::new(
58                config.event_loop_size(),
59                config.stack_size(),
60                config.min_size(),
61                config.max_size(),
62                config.keep_alive_time(),
63            )
64            .expect("init default EventLoops failed !");
65            #[cfg(feature = "log")]
66            let _ = tracing_subscriber::fmt()
67                .with_thread_names(true)
68                .with_line_number(true)
69                .with_timer(tracing_subscriber::fmt::time::OffsetTime::new(
70                    time::UtcOffset::from_hms(8, 0, 0).expect("create UtcOffset failed !"),
71                    time::format_description::well_known::Rfc2822,
72                ))
73                .try_init();
74            info!("open-coroutine init with {config:#?}");
75            loops
76        });
77    }
78
79    /// Create a new `EventLoops`.
80    pub fn new(
81        event_loop_size: usize,
82        stack_size: usize,
83        min_size: usize,
84        max_size: usize,
85        keep_alive_time: u64,
86    ) -> std::io::Result<Self> {
87        let shared_stop = Arc::new((Mutex::new(AtomicUsize::new(0)), Condvar::new()));
88        let mut loops = VecDeque::new();
89        for i in 0..event_loop_size {
90            loops.push_back(
91                EventLoop::new(
92                    format!("open-coroutine-event-loop-{i}"),
93                    i,
94                    stack_size,
95                    min_size,
96                    max_size,
97                    keep_alive_time,
98                    shared_stop.clone(),
99                )?
100                .start()?,
101            );
102        }
103        Ok(Self {
104            index: AtomicUsize::new(0),
105            loops,
106            shared_stop,
107        })
108    }
109
110    fn round_robin() -> &'static Arc<EventLoop<'static>> {
111        let instance = INSTANCE.get().expect("EventLoops not init !");
112        let index = instance.index.fetch_add(1, Ordering::Release) % instance.loops.len();
113        instance
114            .loops
115            .get(index)
116            .unwrap_or_else(move || panic!("init event-loop-{index} failed!"))
117    }
118
119    /// Get a `EventLoop`, prefer current.
120    fn event_loop() -> &'static EventLoop<'static> {
121        EventLoop::current().unwrap_or_else(|| Self::round_robin())
122    }
123
124    /// Submit a new task to event-loop.
125    ///
126    /// Allow multiple threads to concurrently submit task to the pool,
127    /// but only allow one thread to execute scheduling.
128    pub fn submit_task(
129        name: Option<String>,
130        func: impl FnOnce(Option<usize>) -> Option<usize> + 'static,
131        param: Option<usize>,
132        priority: Option<c_longlong>,
133    ) -> JoinHandle {
134        let event_loop = Self::round_robin();
135        event_loop
136            .submit_task(name, func, param, priority)
137            .map_or_else(
138                |_| JoinHandle::err(event_loop),
139                |n| JoinHandle::new(event_loop, n.as_str()),
140            )
141    }
142
143    /// Submit a new coroutine to event-loop.
144    ///
145    /// Allow multiple threads to concurrently submit coroutine to the pool,
146    /// but only allow one thread to execute scheduling.
147    pub fn submit_co(
148        f: impl FnOnce(&Suspender<(), ()>, ()) -> Option<usize> + 'static,
149        stack_size: Option<usize>,
150        priority: Option<c_longlong>,
151    ) -> std::io::Result<()> {
152        Self::round_robin().submit_co(f, stack_size, priority)
153    }
154
155    /// Waiting for read or write events to occur.
156    /// This method can only be used in coroutines.
157    pub fn wait_event(timeout: Option<Duration>) -> std::io::Result<()> {
158        Self::event_loop().timed_wait_just(timeout)
159    }
160
161    /// Waiting for a read event to occur.
162    /// This method can only be used in coroutines.
163    pub fn wait_read_event(fd: c_int, timeout: Option<Duration>) -> std::io::Result<()> {
164        let event_loop = Self::event_loop();
165        event_loop.add_read_event(fd)?;
166        event_loop.wait_just(timeout)
167    }
168
169    /// Waiting for a write event to occur.
170    /// This method can only be used in coroutines.
171    pub fn wait_write_event(fd: c_int, timeout: Option<Duration>) -> std::io::Result<()> {
172        let event_loop = Self::event_loop();
173        event_loop.add_write_event(fd)?;
174        event_loop.wait_just(timeout)
175    }
176
177    /// Remove read and write event interests.
178    /// This method can only be used in coroutines.
179    pub fn del_event(fd: c_int) -> std::io::Result<()> {
180        if let Some(event_loop) = EventLoop::current() {
181            event_loop.del_event(fd)?;
182        } else {
183            let instance = INSTANCE.get().expect("EventLoops not init !");
184            for event_loop in &instance.loops {
185                event_loop.del_event(fd)?;
186            }
187        }
188        Ok(())
189    }
190
191    /// Remove read event interest.
192    /// This method can only be used in coroutines.
193    pub fn del_read_event(fd: c_int) -> std::io::Result<()> {
194        if let Some(event_loop) = EventLoop::current() {
195            event_loop.del_read_event(fd)?;
196        } else {
197            let instance = INSTANCE.get().expect("EventLoops not init !");
198            for event_loop in &instance.loops {
199                event_loop.del_read_event(fd)?;
200            }
201        }
202        Ok(())
203    }
204
205    /// Remove write event interest.
206    /// This method can only be used in coroutines.
207    pub fn del_write_event(fd: c_int) -> std::io::Result<()> {
208        if let Some(event_loop) = EventLoop::current() {
209            event_loop.del_write_event(fd)?;
210        } else {
211            let instance = INSTANCE.get().expect("EventLoops not init !");
212            for event_loop in &instance.loops {
213                event_loop.del_write_event(fd)?;
214            }
215        }
216        Ok(())
217    }
218
219    /// Stop all `EventLoop`.
220    pub fn stop(wait_time: Duration) -> std::io::Result<()> {
221        if let Some(instance) = INSTANCE.get() {
222            for i in &instance.loops {
223                _ = i.stop(Duration::ZERO);
224            }
225            let (lock, cvar) = &*instance.shared_stop;
226            let guard = lock
227                .lock()
228                .map_err(|_| Error::new(ErrorKind::TimedOut, "wait failed !"))?;
229            let result = cvar
230                .wait_timeout_while(guard, wait_time, |stopped| {
231                    stopped.load(Ordering::Acquire) > 0
232                })
233                .map_err(|_| Error::new(ErrorKind::TimedOut, "wait failed !"))?;
234            if result.1.timed_out() {
235                error!("open-coroutine stop timeout !");
236                return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
237            }
238            #[cfg(all(unix, feature = "preemptive"))]
239            crate::monitor::Monitor::stop();
240        }
241        Ok(())
242    }
243}
244
245macro_rules! impl_io_uring {
246    ( $syscall: ident($($arg: ident: $arg_type: ty),*) -> $result: ty ) => {
247        #[cfg(all(target_os = "linux", feature = "io_uring"))]
248        impl EventLoops {
249            #[allow(missing_docs)]
250            pub fn $syscall(
251                $($arg: $arg_type),*
252            ) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
253                Self::event_loop().$syscall($($arg, )*)
254            }
255        }
256    }
257}
258
259impl_io_uring!(epoll_ctl(epfd: c_int, op: c_int, fd: c_int, event: *mut epoll_event) -> c_int);
260impl_io_uring!(socket(domain: c_int, ty: c_int, protocol: c_int) -> c_int);
261impl_io_uring!(accept(fd: c_int, addr: *mut sockaddr, len: *mut socklen_t) -> c_int);
262impl_io_uring!(accept4(fd: c_int, addr: *mut sockaddr, len: *mut socklen_t, flg: c_int) -> c_int);
263impl_io_uring!(shutdown(fd: c_int, how: c_int) -> c_int);
264impl_io_uring!(connect(fd: c_int, address: *const sockaddr, len: socklen_t) -> c_int);
265impl_io_uring!(close(fd: c_int) -> c_int);
266impl_io_uring!(recv(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int) -> ssize_t);
267impl_io_uring!(read(fd: c_int, buf: *mut c_void, count: size_t) -> ssize_t);
268impl_io_uring!(pread(fd: c_int, buf: *mut c_void, count: size_t, offset: off_t) -> ssize_t);
269impl_io_uring!(readv(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
270impl_io_uring!(preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
271impl_io_uring!(recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t);
272impl_io_uring!(send(fd: c_int, buf: *const c_void, len: size_t, flags: c_int) -> ssize_t);
273impl_io_uring!(sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int, addr: *const sockaddr, addrlen: socklen_t) -> ssize_t);
274impl_io_uring!(write(fd: c_int, buf: *const c_void, count: size_t) -> ssize_t);
275impl_io_uring!(pwrite(fd: c_int, buf: *const c_void, count: size_t, offset: off_t) -> ssize_t);
276impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
277impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
278impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t);
279impl_io_uring!(fsync(fd: c_int) -> c_int);
280impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
281impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
282impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);