Skip to main content

oxigdal_embedded/
sync.rs

1//! Synchronization primitives for embedded systems
2
3use crate::error::{EmbeddedError, Result};
4use core::cell::UnsafeCell;
5use core::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
6use portable_atomic::AtomicI32;
7
8/// Atomic counter for statistics and monitoring
9pub struct AtomicCounter {
10    value: AtomicU64,
11}
12
13impl AtomicCounter {
14    /// Create a new counter with initial value
15    pub const fn new(initial: u64) -> Self {
16        Self {
17            value: AtomicU64::new(initial),
18        }
19    }
20
21    /// Increment the counter
22    pub fn increment(&self) -> u64 {
23        self.value.fetch_add(1, Ordering::Relaxed)
24    }
25
26    /// Decrement the counter
27    pub fn decrement(&self) -> u64 {
28        self.value.fetch_sub(1, Ordering::Relaxed)
29    }
30
31    /// Add to the counter
32    pub fn add(&self, val: u64) -> u64 {
33        self.value.fetch_add(val, Ordering::Relaxed)
34    }
35
36    /// Get current value
37    pub fn get(&self) -> u64 {
38        self.value.load(Ordering::Relaxed)
39    }
40
41    /// Set value
42    pub fn set(&self, val: u64) {
43        self.value.store(val, Ordering::Relaxed);
44    }
45
46    /// Reset to zero
47    pub fn reset(&self) {
48        self.value.store(0, Ordering::Relaxed);
49    }
50
51    /// Compare and swap
52    pub fn compare_and_swap(&self, current: u64, new: u64) -> Result<u64> {
53        match self
54            .value
55            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
56        {
57            Ok(val) => Ok(val),
58            Err(_) => Err(EmbeddedError::ResourceBusy),
59        }
60    }
61}
62
63impl Default for AtomicCounter {
64    fn default() -> Self {
65        Self::new(0)
66    }
67}
68
69/// Spinlock for mutual exclusion
70pub struct Spinlock {
71    locked: AtomicBool,
72}
73
74impl Spinlock {
75    /// Create a new unlocked spinlock
76    pub const fn new() -> Self {
77        Self {
78            locked: AtomicBool::new(false),
79        }
80    }
81
82    /// Try to acquire the lock without blocking
83    pub fn try_lock(&self) -> Result<()> {
84        match self
85            .locked
86            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
87        {
88            Ok(_) => Ok(()),
89            Err(_) => Err(EmbeddedError::ResourceBusy),
90        }
91    }
92
93    /// Acquire the lock (spinning until available)
94    pub fn lock(&self) {
95        while self
96            .locked
97            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
98            .is_err()
99        {
100            // Spin with hint to reduce power consumption
101            core::hint::spin_loop();
102        }
103    }
104
105    /// Release the lock
106    ///
107    /// # Safety
108    ///
109    /// Must be called by the thread that acquired the lock
110    pub unsafe fn unlock(&self) {
111        self.locked.store(false, Ordering::Release);
112    }
113
114    /// Check if locked
115    pub fn is_locked(&self) -> bool {
116        self.locked.load(Ordering::Relaxed)
117    }
118}
119
120impl Default for Spinlock {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126/// RAII guard for spinlock
127pub struct SpinlockGuard<'a> {
128    lock: &'a Spinlock,
129}
130
131impl<'a> Drop for SpinlockGuard<'a> {
132    fn drop(&mut self) {
133        // SAFETY: Guard owns the lock
134        unsafe {
135            self.lock.unlock();
136        }
137    }
138}
139
140impl Spinlock {
141    /// Acquire lock and return RAII guard
142    pub fn lock_guard(&self) -> SpinlockGuard<'_> {
143        self.lock();
144        SpinlockGuard { lock: self }
145    }
146}
147
148/// Simple mutex using spinlock
149pub struct Mutex<T> {
150    lock: Spinlock,
151    data: UnsafeCell<T>,
152}
153
154impl<T> Mutex<T> {
155    /// Create a new mutex
156    pub const fn new(data: T) -> Self {
157        Self {
158            lock: Spinlock::new(),
159            data: UnsafeCell::new(data),
160        }
161    }
162
163    /// Try to lock and get access to data
164    pub fn try_lock(&self) -> Result<MutexGuard<'_, T>> {
165        self.lock.try_lock()?;
166        Ok(MutexGuard { mutex: self })
167    }
168
169    /// Lock and get access to data (blocking)
170    pub fn lock(&self) -> MutexGuard<'_, T> {
171        self.lock.lock();
172        MutexGuard { mutex: self }
173    }
174
175    /// Get a mutable reference (when we have exclusive access)
176    pub fn get_mut(&mut self) -> &mut T {
177        self.data.get_mut()
178    }
179}
180
181// SAFETY: Mutex provides exclusive access through locking
182unsafe impl<T: Send> Send for Mutex<T> {}
183unsafe impl<T: Send> Sync for Mutex<T> {}
184
185/// RAII guard for mutex
186pub struct MutexGuard<'a, T> {
187    mutex: &'a Mutex<T>,
188}
189
190impl<'a, T> core::ops::Deref for MutexGuard<'a, T> {
191    type Target = T;
192
193    fn deref(&self) -> &Self::Target {
194        // SAFETY: Guard holds the lock
195        unsafe { &*self.mutex.data.get() }
196    }
197}
198
199impl<'a, T> core::ops::DerefMut for MutexGuard<'a, T> {
200    fn deref_mut(&mut self) -> &mut Self::Target {
201        // SAFETY: Guard holds the lock
202        unsafe { &mut *self.mutex.data.get() }
203    }
204}
205
206impl<'a, T> Drop for MutexGuard<'a, T> {
207    fn drop(&mut self) {
208        // SAFETY: Guard owns the lock
209        unsafe {
210            self.mutex.lock.unlock();
211        }
212    }
213}
214
215/// Semaphore for resource counting
216pub struct Semaphore {
217    count: AtomicI32,
218}
219
220impl Semaphore {
221    /// Create a new semaphore with initial count
222    pub const fn new(count: i32) -> Self {
223        Self {
224            count: AtomicI32::new(count),
225        }
226    }
227
228    /// Try to acquire (decrement count)
229    pub fn try_acquire(&self) -> Result<()> {
230        let current = self.count.load(Ordering::Acquire);
231        if current <= 0 {
232            return Err(EmbeddedError::ResourceBusy);
233        }
234
235        match self
236            .count
237            .compare_exchange(current, current - 1, Ordering::AcqRel, Ordering::Acquire)
238        {
239            Ok(_) => Ok(()),
240            Err(_) => Err(EmbeddedError::ResourceBusy),
241        }
242    }
243
244    /// Acquire (blocking)
245    pub fn acquire(&self) {
246        loop {
247            if self.try_acquire().is_ok() {
248                return;
249            }
250            core::hint::spin_loop();
251        }
252    }
253
254    /// Release (increment count)
255    pub fn release(&self) {
256        self.count.fetch_add(1, Ordering::Release);
257    }
258
259    /// Get current count
260    pub fn count(&self) -> i32 {
261        self.count.load(Ordering::Relaxed)
262    }
263}
264
265/// Barrier for synchronizing multiple threads/tasks
266pub struct Barrier {
267    threshold: u32,
268    count: AtomicU32,
269    generation: AtomicU32,
270}
271
272impl Barrier {
273    /// Create a new barrier
274    pub const fn new(threshold: u32) -> Self {
275        Self {
276            threshold,
277            count: AtomicU32::new(0),
278            generation: AtomicU32::new(0),
279        }
280    }
281
282    /// Wait at the barrier
283    pub fn wait(&self) -> bool {
284        let current_gen = self.generation.load(Ordering::Acquire);
285        let count = self.count.fetch_add(1, Ordering::AcqRel);
286
287        if count + 1 >= self.threshold {
288            // Last thread to arrive
289            self.count.store(0, Ordering::Release);
290            self.generation.fetch_add(1, Ordering::Release);
291            true
292        } else {
293            // Wait for all threads
294            while self.generation.load(Ordering::Acquire) == current_gen {
295                core::hint::spin_loop();
296            }
297            false
298        }
299    }
300
301    /// Get the threshold
302    pub const fn threshold(&self) -> u32 {
303        self.threshold
304    }
305}
306
307/// Once cell for one-time initialization
308pub struct Once {
309    state: AtomicU32,
310}
311
312const ONCE_INCOMPLETE: u32 = 0;
313const ONCE_RUNNING: u32 = 1;
314const ONCE_COMPLETE: u32 = 2;
315
316impl Once {
317    /// Create a new Once cell
318    pub const fn new() -> Self {
319        Self {
320            state: AtomicU32::new(ONCE_INCOMPLETE),
321        }
322    }
323
324    /// Call a function once
325    pub fn call_once<F>(&self, f: F)
326    where
327        F: FnOnce(),
328    {
329        if self.state.load(Ordering::Acquire) == ONCE_COMPLETE {
330            return;
331        }
332
333        match self.state.compare_exchange(
334            ONCE_INCOMPLETE,
335            ONCE_RUNNING,
336            Ordering::AcqRel,
337            Ordering::Acquire,
338        ) {
339            Ok(_) => {
340                f();
341                self.state.store(ONCE_COMPLETE, Ordering::Release);
342            }
343            Err(ONCE_RUNNING) => {
344                // Another thread is running, wait for completion
345                while self.state.load(Ordering::Acquire) != ONCE_COMPLETE {
346                    core::hint::spin_loop();
347                }
348            }
349            Err(ONCE_COMPLETE) => {
350                // Already complete
351            }
352            Err(_) => {
353                // Unexpected state
354            }
355        }
356    }
357
358    /// Check if already called
359    pub fn is_complete(&self) -> bool {
360        self.state.load(Ordering::Acquire) == ONCE_COMPLETE
361    }
362}
363
364impl Default for Once {
365    fn default() -> Self {
366        Self::new()
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    #[test]
375    fn test_atomic_counter() {
376        let counter = AtomicCounter::new(0);
377        assert_eq!(counter.get(), 0);
378
379        counter.increment();
380        counter.increment();
381        assert_eq!(counter.get(), 2);
382
383        counter.decrement();
384        assert_eq!(counter.get(), 1);
385
386        counter.reset();
387        assert_eq!(counter.get(), 0);
388    }
389
390    #[test]
391    fn test_spinlock() {
392        let lock = Spinlock::new();
393        assert!(!lock.is_locked());
394
395        lock.try_lock().expect("lock failed");
396        assert!(lock.is_locked());
397        assert!(lock.try_lock().is_err());
398
399        unsafe { lock.unlock() };
400        assert!(!lock.is_locked());
401    }
402
403    #[test]
404    fn test_mutex() {
405        let mutex = Mutex::new(42);
406        {
407            let guard = mutex.lock();
408            assert_eq!(*guard, 42);
409        }
410
411        {
412            let mut guard = mutex.lock();
413            *guard = 100;
414        }
415
416        let guard = mutex.lock();
417        assert_eq!(*guard, 100);
418    }
419
420    #[test]
421    fn test_semaphore() {
422        let sem = Semaphore::new(2);
423        assert_eq!(sem.count(), 2);
424
425        sem.try_acquire().expect("acquire failed");
426        assert_eq!(sem.count(), 1);
427
428        sem.try_acquire().expect("acquire failed");
429        assert_eq!(sem.count(), 0);
430
431        assert!(sem.try_acquire().is_err());
432
433        sem.release();
434        assert_eq!(sem.count(), 1);
435    }
436
437    #[test]
438    fn test_once() {
439        let once = Once::new();
440        let counter = AtomicCounter::new(0);
441
442        once.call_once(|| {
443            counter.increment();
444        });
445
446        once.call_once(|| {
447            counter.increment();
448        });
449
450        assert_eq!(counter.get(), 1);
451        assert!(once.is_complete());
452    }
453}