Skip to main content

user_lib/
sync_lab.rs

1use crate::{
2    ClockId, TimeSpec, clock_gettime, condvar_wait, mutex_lock, semaphore_create, semaphore_down,
3    semaphore_up, trace,
4};
5use core::sync::atomic::{AtomicUsize, Ordering};
6
7/// t2l5 trace 请求:清零内核计数器。
8pub const T2L5_TRACE_RESET_METRICS: usize = 0x200;
9/// t2l5 trace 请求:读取上下文切换次数。
10pub const T2L5_TRACE_GET_CONTEXT_SWITCHES: usize = 0x201;
11/// t2l5 trace 请求:读取同步阻塞次数。
12pub const T2L5_TRACE_GET_BLOCKED_SYNC: usize = 0x202;
13/// t2l5 trace 请求:读取同步唤醒次数。
14pub const T2L5_TRACE_GET_WAKEUPS: usize = 0x203;
15
16/// 内核计数器快照。
17#[derive(Clone, Copy)]
18pub struct KernelMetricSnapshot {
19    /// 总上下文切换次数。
20    pub context_switches: usize,
21    /// 因同步原语阻塞的次数。
22    pub blocked_sync_ops: usize,
23    /// 同步原语唤醒次数。
24    pub wakeups: usize,
25}
26
27impl KernelMetricSnapshot {
28    /// 与更早快照求差。
29    pub fn diff(self, earlier: Self) -> Self {
30        Self {
31            context_switches: self.context_switches - earlier.context_switches,
32            blocked_sync_ops: self.blocked_sync_ops - earlier.blocked_sync_ops,
33            wakeups: self.wakeups - earlier.wakeups,
34        }
35    }
36}
37
38/// 共享统计器。
39pub struct AtomicStats {
40    attempts: AtomicUsize,
41    contentions: AtomicUsize,
42    acquisitions: AtomicUsize,
43    total_wait_us: AtomicUsize,
44    max_wait_us: AtomicUsize,
45    total_hold_us: AtomicUsize,
46    max_hold_us: AtomicUsize,
47    starvation: AtomicUsize,
48}
49
50/// 统计快照。
51#[derive(Clone, Copy)]
52pub struct StatsSnapshot {
53    /// 尝试次数。
54    pub attempts: usize,
55    /// 发生竞争的次数。
56    pub contentions: usize,
57    /// 成功获取次数。
58    pub acquisitions: usize,
59    /// 累计等待时间。
60    pub total_wait_us: usize,
61    /// 最大等待时间。
62    pub max_wait_us: usize,
63    /// 累计持有时间。
64    pub total_hold_us: usize,
65    /// 最大持有时间。
66    pub max_hold_us: usize,
67    /// 超过阈值的等待次数。
68    pub starvation: usize,
69}
70
71impl StatsSnapshot {
72    /// 平均等待时间。
73    pub fn avg_wait_us(&self) -> usize {
74        if self.acquisitions == 0 {
75            0
76        } else {
77            self.total_wait_us / self.acquisitions
78        }
79    }
80
81    /// 平均持有时间。
82    pub fn avg_hold_us(&self) -> usize {
83        if self.acquisitions == 0 {
84            0
85        } else {
86            self.total_hold_us / self.acquisitions
87        }
88    }
89}
90
91impl AtomicStats {
92    /// 创建空统计器。
93    pub const fn new() -> Self {
94        Self {
95            attempts: AtomicUsize::new(0),
96            contentions: AtomicUsize::new(0),
97            acquisitions: AtomicUsize::new(0),
98            total_wait_us: AtomicUsize::new(0),
99            max_wait_us: AtomicUsize::new(0),
100            total_hold_us: AtomicUsize::new(0),
101            max_hold_us: AtomicUsize::new(0),
102            starvation: AtomicUsize::new(0),
103        }
104    }
105
106    /// 记录一次获取成功。
107    pub fn record_wait(&self, wait_us: usize, contended: bool, starvation_threshold_us: usize) {
108        self.attempts.fetch_add(1, Ordering::Relaxed);
109        self.acquisitions.fetch_add(1, Ordering::Relaxed);
110        if contended {
111            self.contentions.fetch_add(1, Ordering::Relaxed);
112        }
113        self.total_wait_us.fetch_add(wait_us, Ordering::Relaxed);
114        update_max(&self.max_wait_us, wait_us);
115        if starvation_threshold_us != 0 && wait_us > starvation_threshold_us {
116            self.starvation.fetch_add(1, Ordering::Relaxed);
117        }
118    }
119
120    /// 记录一次持有时间。
121    pub fn record_hold(&self, hold_us: usize) {
122        self.total_hold_us.fetch_add(hold_us, Ordering::Relaxed);
123        update_max(&self.max_hold_us, hold_us);
124    }
125
126    /// 记录一次只关心持有时长的样本。
127    pub fn record_hold_sample(&self, hold_us: usize) {
128        self.attempts.fetch_add(1, Ordering::Relaxed);
129        self.acquisitions.fetch_add(1, Ordering::Relaxed);
130        self.record_hold(hold_us);
131    }
132
133    /// 读取快照。
134    pub fn snapshot(&self) -> StatsSnapshot {
135        StatsSnapshot {
136            attempts: self.attempts.load(Ordering::Relaxed),
137            contentions: self.contentions.load(Ordering::Relaxed),
138            acquisitions: self.acquisitions.load(Ordering::Relaxed),
139            total_wait_us: self.total_wait_us.load(Ordering::Relaxed),
140            max_wait_us: self.max_wait_us.load(Ordering::Relaxed),
141            total_hold_us: self.total_hold_us.load(Ordering::Relaxed),
142            max_hold_us: self.max_hold_us.load(Ordering::Relaxed),
143            starvation: self.starvation.load(Ordering::Relaxed),
144        }
145    }
146}
147
148/// 公平 ticket spinlock。
149pub struct TicketSpinLock {
150    next_ticket: AtomicUsize,
151    serving: AtomicUsize,
152}
153
154/// 公平读写锁。
155pub struct FairRwLock {
156    service_queue: usize,
157    resource: usize,
158    read_mutex: usize,
159    reader_count: AtomicUsize,
160}
161
162/// 读者优先读写锁。
163pub struct ReaderPreferRwLock {
164    resource: usize,
165    read_mutex: usize,
166    reader_count: AtomicUsize,
167}
168
169impl TicketSpinLock {
170    /// 创建新的 ticket spinlock。
171    pub const fn new() -> Self {
172        Self {
173            next_ticket: AtomicUsize::new(0),
174            serving: AtomicUsize::new(0),
175        }
176    }
177
178    /// 获取锁;返回值表示这次获取是否经历了竞争。
179    pub fn lock(&self) -> bool {
180        let ticket = self.next_ticket.fetch_add(1, Ordering::Relaxed);
181        let mut contended = false;
182        while self.serving.load(Ordering::Acquire) != ticket {
183            contended = true;
184            crate::sched_yield();
185        }
186        contended
187    }
188
189    /// 释放锁。
190    pub fn unlock(&self) {
191        self.serving.fetch_add(1, Ordering::Release);
192    }
193}
194
195impl FairRwLock {
196    /// 创建公平读写锁。
197    pub fn new() -> Self {
198        Self {
199            service_queue: semaphore_create(1) as usize,
200            resource: semaphore_create(1) as usize,
201            read_mutex: semaphore_create(1) as usize,
202            reader_count: AtomicUsize::new(0),
203        }
204    }
205
206    /// 读加锁。
207    pub fn read_lock(&self) -> bool {
208        let mut contended = semaphore_down_contended(self.service_queue);
209        contended |= semaphore_down_contended(self.read_mutex);
210        let readers = self.reader_count.fetch_add(1, Ordering::SeqCst);
211        if readers == 0 {
212            contended |= semaphore_down_contended(self.resource);
213        }
214        semaphore_up(self.read_mutex);
215        semaphore_up(self.service_queue);
216        contended
217    }
218
219    /// 读解锁。
220    pub fn read_unlock(&self) {
221        let _ = semaphore_down_contended(self.read_mutex);
222        let remaining = self.reader_count.fetch_sub(1, Ordering::SeqCst) - 1;
223        if remaining == 0 {
224            semaphore_up(self.resource);
225        }
226        semaphore_up(self.read_mutex);
227    }
228
229    /// 写加锁。
230    pub fn write_lock(&self) -> bool {
231        let mut contended = semaphore_down_contended(self.service_queue);
232        contended |= semaphore_down_contended(self.resource);
233        semaphore_up(self.service_queue);
234        contended
235    }
236
237    /// 写解锁。
238    pub fn write_unlock(&self) {
239        semaphore_up(self.resource);
240    }
241}
242
243impl ReaderPreferRwLock {
244    /// 创建读者优先读写锁。
245    pub fn new() -> Self {
246        Self {
247            resource: semaphore_create(1) as usize,
248            read_mutex: semaphore_create(1) as usize,
249            reader_count: AtomicUsize::new(0),
250        }
251    }
252
253    /// 读加锁。
254    pub fn read_lock(&self) -> bool {
255        let mut contended = semaphore_down_contended(self.read_mutex);
256        let readers = self.reader_count.fetch_add(1, Ordering::SeqCst);
257        if readers == 0 {
258            contended |= semaphore_down_contended(self.resource);
259        }
260        semaphore_up(self.read_mutex);
261        contended
262    }
263
264    /// 读解锁。
265    pub fn read_unlock(&self) {
266        let _ = semaphore_down_contended(self.read_mutex);
267        let remaining = self.reader_count.fetch_sub(1, Ordering::SeqCst) - 1;
268        if remaining == 0 {
269            semaphore_up(self.resource);
270        }
271        semaphore_up(self.read_mutex);
272    }
273
274    /// 写加锁。
275    pub fn write_lock(&self) -> bool {
276        semaphore_down_contended(self.resource)
277    }
278
279    /// 写解锁。
280    pub fn write_unlock(&self) {
281        semaphore_up(self.resource);
282    }
283}
284
285/// 读取当前时间(微秒)。
286pub fn now_us() -> usize {
287    let mut time = TimeSpec::ZERO;
288    clock_gettime(ClockId::CLOCK_MONOTONIC, &mut time as *mut _ as _);
289    time.tv_sec * 1_000_000 + time.tv_nsec / 1_000
290}
291
292/// 清零内核计数器。
293pub fn reset_kernel_metrics() {
294    let _ = trace(T2L5_TRACE_RESET_METRICS, 0, 0);
295}
296
297/// 读取内核计数器。
298pub fn kernel_metrics() -> KernelMetricSnapshot {
299    KernelMetricSnapshot {
300        context_switches: trace(T2L5_TRACE_GET_CONTEXT_SWITCHES, 0, 0) as usize,
301        blocked_sync_ops: trace(T2L5_TRACE_GET_BLOCKED_SYNC, 0, 0) as usize,
302        wakeups: trace(T2L5_TRACE_GET_WAKEUPS, 0, 0) as usize,
303    }
304}
305
306/// 包装 `mutex_lock`,返回是否发生过阻塞。
307pub fn mutex_lock_contended(mutex_id: usize) -> bool {
308    match mutex_lock(mutex_id) {
309        0 => false,
310        -1 => true,
311        ret => panic!("unexpected mutex_lock return value: {ret}"),
312    }
313}
314
315/// 包装 `semaphore_down`,返回是否发生过阻塞。
316pub fn semaphore_down_contended(sem_id: usize) -> bool {
317    match semaphore_down(sem_id) {
318        0 => false,
319        -1 => true,
320        ret => panic!("unexpected semaphore_down return value: {ret}"),
321    }
322}
323
324/// 包装 `condvar_wait`,返回是否发生过阻塞。
325pub fn condvar_wait_contended(condvar_id: usize, mutex_id: usize) -> bool {
326    match condvar_wait(condvar_id, mutex_id) {
327        -1 => true,
328        0 => false,
329        ret => panic!("unexpected condvar_wait return value: {ret}"),
330    }
331}
332
333fn update_max(target: &AtomicUsize, value: usize) {
334    let mut current = target.load(Ordering::Relaxed);
335    while value > current {
336        match target.compare_exchange(current, value, Ordering::Relaxed, Ordering::Relaxed) {
337            Ok(_) => break,
338            Err(actual) => current = actual,
339        }
340    }
341}