open_coroutine_core/common/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#[cfg(target_os = "linux")]
use std::ffi::c_int;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

/// CI tools.
#[cfg(feature = "ci")]
pub mod ci;

/// Constants.
pub mod constants;

/// Check <https://www.rustwiki.org.cn/en/reference/introduction.html> for help information.
pub(crate) mod macros;

/// `BeanFactory` impls.
pub mod beans;

/// Suppose a thread in a work-stealing scheduler is idle and looking for the next task to run. To
/// find an available task, it might do the following:
///
/// 1. Try popping one task from the local worker queue.
/// 2. Try popping and stealing tasks from another local worker queue.
/// 3. Try popping and stealing a batch of tasks from the global injector queue.
///
/// A queue implementation of work-stealing strategy:
///
/// # Examples
///
/// ```
/// use open_coroutine_core::common::work_steal::WorkStealQueue;
///
/// let queue = WorkStealQueue::new(2, 64);
/// queue.push(6);
/// queue.push(7);
///
/// let local0 = queue.local_queue();
/// local0.push(2);
/// local0.push(3);
/// local0.push(4);
/// local0.push(5);
///
/// let local1 = queue.local_queue();
/// local1.push(0);
/// local1.push(1);
/// for i in 0..8 {
///     assert_eq!(local1.pop(), Some(i));
/// }
/// assert_eq!(local0.pop(), None);
/// assert_eq!(local1.pop(), None);
/// assert_eq!(queue.pop(), None);
/// ```
///
pub mod work_steal;

/// Suppose a thread in a work-stealing scheduler is idle and looking for the next task to run. To
/// find an available task, it might do the following:
///
/// 1. Try popping one task from the local worker queue.
/// 2. Try popping and stealing tasks from another local worker queue.
/// 3. Try popping and stealing a batch of tasks from the global injector queue.
///
/// A queue implementation of work-stealing strategy:
///
/// # Examples
///
/// ```
/// use open_coroutine_core::common::ordered_work_steal::OrderedWorkStealQueue;
///
/// let queue = OrderedWorkStealQueue::new(2, 64);
/// for i in 6..8 {
///     queue.push_with_priority(i, i);
/// }
/// let local0 = queue.local_queue();
/// for i in 2..6 {
///    local0.push_with_priority(i, i);
/// }
/// let local1 = queue.local_queue();
/// for i in 0..2 {
///     local1.push_with_priority(i, i);
/// }
/// for i in 0..8 {
///     assert_eq!(local1.pop(), Some(i));
/// }
/// assert_eq!(local0.pop(), None);
/// assert_eq!(local1.pop(), None);
/// assert_eq!(queue.pop(), None);
/// ```
///
pub mod ordered_work_steal;

#[cfg(target_os = "linux")]
extern "C" {
    fn linux_version_code() -> c_int;
}

/// Get linux kernel version number.
#[must_use]
#[cfg(target_os = "linux")]
pub fn kernel_version(major: c_int, patchlevel: c_int, sublevel: c_int) -> c_int {
    ((major) << 16) + ((patchlevel) << 8) + if (sublevel) > 255 { 255 } else { sublevel }
}

/// Get current linux kernel version number.
#[must_use]
#[cfg(target_os = "linux")]
pub fn current_kernel_version() -> c_int {
    unsafe { linux_version_code() }
}

/// get the current wall clock in ns
///
/// # Panics
/// if the time is before `UNIX_EPOCH`
#[must_use]
pub fn now() -> u64 {
    u64::try_from(
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("1970-01-01 00:00:00 UTC was {} seconds ago!")
            .as_nanos(),
    )
    .unwrap_or(u64::MAX)
}

/// current ns time add `dur`.
#[must_use]
pub fn get_timeout_time(dur: Duration) -> u64 {
    u64::try_from(dur.as_nanos())
        .map(|d| d.saturating_add(now()))
        .unwrap_or(u64::MAX)
}

/// Make the total time into slices.
#[must_use]
pub fn get_slices(total: Duration, slice: Duration) -> Vec<Duration> {
    let mut result = Vec::new();
    if Duration::ZERO == total {
        return result;
    }
    let mut left_total = total;
    while left_total > slice {
        result.push(slice);
        if let Some(new_left_total) = left_total.checked_sub(slice) {
            left_total = new_left_total;
        }
    }
    result.push(left_total);
    result
}

/// Get the page size of this system.
pub fn page_size() -> usize {
    static PAGE_SIZE: AtomicUsize = AtomicUsize::new(0);
    let mut ret = PAGE_SIZE.load(Ordering::Relaxed);
    if ret == 0 {
        unsafe {
            cfg_if::cfg_if! {
                if #[cfg(windows)] {
                    let mut info = std::mem::zeroed();
                    windows_sys::Win32::System::SystemInformation::GetSystemInfo(&mut info);
                    ret = usize::try_from(info.dwPageSize).expect("get page size failed");
                } else {
                    ret = usize::try_from(libc::sysconf(libc::_SC_PAGESIZE)).expect("get page size failed");
                }
            }
        }
        PAGE_SIZE.store(ret, Ordering::Relaxed);
    }
    ret
}

/// Recommended read zone size for coroutines.
pub fn default_red_zone() -> usize {
    static DEFAULT_RED_ZONE: AtomicUsize = AtomicUsize::new(0);
    let mut ret = DEFAULT_RED_ZONE.load(Ordering::Relaxed);
    if ret == 0 {
        cfg_if::cfg_if! {
            if #[cfg(windows)] {
                ret = 32 * 1024 + page_size() * 3;
            } else {
                ret = 16 * 1024 + page_size();
            }
        }
        DEFAULT_RED_ZONE.store(ret, Ordering::Relaxed);
    }
    ret
}

#[allow(missing_docs)]
#[repr(C)]
#[derive(Debug, Default)]
pub struct CondvarBlocker {
    mutex: std::sync::Mutex<bool>,
    condvar: std::sync::Condvar,
}

impl CondvarBlocker {
    /// Block current thread for a while.
    pub fn block(&self, dur: Duration) {
        _ = self.condvar.wait_timeout_while(
            self.mutex.lock().expect("lock failed"),
            dur,
            |&mut condition| !condition,
        );
        let mut condition = self.mutex.lock().expect("lock failed");
        *condition = false;
    }

    /// Notify by other thread.
    pub fn notify(&self) {
        let mut condition = self.mutex.lock().expect("lock failed");
        // true means the condition is ready, the other thread can continue.
        *condition = true;
        self.condvar.notify_one();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;

    #[test]
    fn blocker() {
        let start = now();
        let blocker = Arc::new(CondvarBlocker::default());
        let clone = blocker.clone();
        _ = std::thread::spawn(move || {
            blocker.notify();
        });
        clone.block(Duration::from_secs(3));
        assert!(now() - start < 1_000_000_000);
    }

    #[cfg(target_os = "linux")]
    #[test]
    fn test() {
        assert!(current_kernel_version() > kernel_version(2, 7, 0))
    }
}