open_coroutine_core/common/
mod.rs

1#[cfg(target_os = "linux")]
2use std::ffi::c_int;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6/// CI tools.
7#[cfg(feature = "ci")]
8pub mod ci;
9
10/// Constants.
11pub mod constants;
12
13/// Check <https://www.rustwiki.org.cn/en/reference/introduction.html> for help information.
14pub(crate) mod macros;
15
16/// `BeanFactory` impls.
17pub mod beans;
18
19/// Suppose a thread in a work-stealing scheduler is idle and looking for the next task to run. To
20/// find an available task, it might do the following:
21///
22/// 1. Try popping one task from the local worker queue.
23/// 2. Try popping and stealing tasks from another local worker queue.
24/// 3. Try popping and stealing a batch of tasks from the global injector queue.
25///
26/// A queue implementation of work-stealing strategy:
27///
28/// # Examples
29///
30/// ```
31/// use open_coroutine_core::common::work_steal::WorkStealQueue;
32///
33/// let queue = WorkStealQueue::new(2, 64);
34/// queue.push(6);
35/// queue.push(7);
36///
37/// let local0 = queue.local_queue();
38/// local0.push(2);
39/// local0.push(3);
40/// local0.push(4);
41/// local0.push(5);
42///
43/// let local1 = queue.local_queue();
44/// local1.push(0);
45/// local1.push(1);
46/// for i in 0..8 {
47///     assert_eq!(local1.pop(), Some(i));
48/// }
49/// assert_eq!(local0.pop(), None);
50/// assert_eq!(local1.pop(), None);
51/// assert_eq!(queue.pop(), None);
52/// ```
53///
54#[doc = include_str!("../../docs/en/work-steal.md")]
55pub mod work_steal;
56
57/// Suppose a thread in a work-stealing scheduler is idle and looking for the next task to run. To
58/// find an available task, it might do the following:
59///
60/// 1. Try popping one task from the local worker queue.
61/// 2. Try popping and stealing tasks from another local worker queue.
62/// 3. Try popping and stealing a batch of tasks from the global injector queue.
63///
64/// A queue implementation of work-stealing strategy:
65///
66/// # Examples
67///
68/// ```
69/// use open_coroutine_core::common::ordered_work_steal::OrderedWorkStealQueue;
70///
71/// let queue = OrderedWorkStealQueue::new(2, 64);
72/// for i in 6..8 {
73///     queue.push_with_priority(i, i);
74/// }
75/// let local0 = queue.local_queue();
76/// for i in 2..6 {
77///    local0.push_with_priority(i, i);
78/// }
79/// let local1 = queue.local_queue();
80/// for i in 0..2 {
81///     local1.push_with_priority(i, i);
82/// }
83/// for i in 0..8 {
84///     assert_eq!(local1.pop(), Some(i));
85/// }
86/// assert_eq!(local0.pop(), None);
87/// assert_eq!(local1.pop(), None);
88/// assert_eq!(queue.pop(), None);
89/// ```
90///
91#[doc = include_str!("../../docs/en/ordered-work-steal.md")]
92pub mod ordered_work_steal;
93
94#[cfg(target_os = "linux")]
95extern "C" {
96    fn linux_version_code() -> c_int;
97}
98
99/// Get linux kernel version number.
100#[must_use]
101#[cfg(target_os = "linux")]
102pub fn kernel_version(major: c_int, patchlevel: c_int, sublevel: c_int) -> c_int {
103    ((major) << 16) + ((patchlevel) << 8) + if (sublevel) > 255 { 255 } else { sublevel }
104}
105
106/// Get current linux kernel version number.
107#[must_use]
108#[cfg(target_os = "linux")]
109pub fn current_kernel_version() -> c_int {
110    unsafe { linux_version_code() }
111}
112
113/// get the current wall clock in ns
114///
115/// # Panics
116/// if the time is before `UNIX_EPOCH`
117#[must_use]
118pub fn now() -> u64 {
119    u64::try_from(
120        SystemTime::now()
121            .duration_since(UNIX_EPOCH)
122            .expect("1970-01-01 00:00:00 UTC was {} seconds ago!")
123            .as_nanos(),
124    )
125    .unwrap_or(u64::MAX)
126}
127
128/// current ns time add `dur`.
129#[must_use]
130pub fn get_timeout_time(dur: Duration) -> u64 {
131    u64::try_from(dur.as_nanos())
132        .map(|d| d.saturating_add(now()))
133        .unwrap_or(u64::MAX)
134}
135
136/// Make the total time into slices.
137#[must_use]
138pub fn get_slices(total: Duration, slice: Duration) -> Vec<Duration> {
139    let mut result = Vec::new();
140    if Duration::ZERO == total {
141        return result;
142    }
143    let mut left_total = total;
144    while left_total > slice {
145        result.push(slice);
146        if let Some(new_left_total) = left_total.checked_sub(slice) {
147            left_total = new_left_total;
148        }
149    }
150    result.push(left_total);
151    result
152}
153
154/// Get the page size of this system.
155pub fn page_size() -> usize {
156    static PAGE_SIZE: AtomicUsize = AtomicUsize::new(0);
157    let mut ret = PAGE_SIZE.load(Ordering::Relaxed);
158    if ret == 0 {
159        unsafe {
160            cfg_if::cfg_if! {
161                if #[cfg(windows)] {
162                    let mut info = std::mem::zeroed();
163                    windows_sys::Win32::System::SystemInformation::GetSystemInfo(&raw mut info);
164                    ret = usize::try_from(info.dwPageSize).expect("get page size failed");
165                } else {
166                    ret = usize::try_from(libc::sysconf(libc::_SC_PAGESIZE)).expect("get page size failed");
167                }
168            }
169        }
170        PAGE_SIZE.store(ret, Ordering::Relaxed);
171    }
172    ret
173}
174
175/// Recommended read zone size for coroutines.
176pub fn default_red_zone() -> usize {
177    static DEFAULT_RED_ZONE: AtomicUsize = AtomicUsize::new(0);
178    let mut ret = DEFAULT_RED_ZONE.load(Ordering::Relaxed);
179    if ret == 0 {
180        cfg_if::cfg_if! {
181            if #[cfg(windows)] {
182                ret = 32 * 1024 + page_size() * 3;
183            } else {
184                ret = 16 * 1024 + page_size();
185            }
186        }
187        DEFAULT_RED_ZONE.store(ret, Ordering::Relaxed);
188    }
189    ret
190}
191
192#[allow(missing_docs)]
193#[repr(C)]
194#[derive(Debug, Default)]
195pub struct CondvarBlocker {
196    mutex: std::sync::Mutex<bool>,
197    condvar: std::sync::Condvar,
198}
199
200impl CondvarBlocker {
201    /// Block current thread for a while.
202    pub fn block(&self, dur: Duration) {
203        _ = self.condvar.wait_timeout_while(
204            self.mutex.lock().expect("lock failed"),
205            dur,
206            |&mut condition| !condition,
207        );
208        let mut condition = self.mutex.lock().expect("lock failed");
209        *condition = false;
210    }
211
212    /// Notify by other thread.
213    pub fn notify(&self) {
214        let mut condition = self.mutex.lock().expect("lock failed");
215        // true means the condition is ready, the other thread can continue.
216        *condition = true;
217        self.condvar.notify_one();
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use std::sync::Arc;
225
226    #[test]
227    fn blocker() {
228        let start = now();
229        let blocker = Arc::new(CondvarBlocker::default());
230        let clone = blocker.clone();
231        _ = std::thread::spawn(move || {
232            blocker.notify();
233        });
234        clone.block(Duration::from_secs(3));
235        assert!(now() - start < 1_000_000_000);
236    }
237
238    #[cfg(target_os = "linux")]
239    #[test]
240    fn test() {
241        assert!(current_kernel_version() > kernel_version(2, 7, 0))
242    }
243}